@@ -37,9 +37,10 @@ export class WalletAddressModel extends BaseModel<IWalletAddress> {
37
37
return JSON . stringify ( transform ) ;
38
38
}
39
39
40
- async updateCoins ( params : { wallet : IWallet ; addresses : string [ ] } ) {
41
- const { wallet, addresses } = params ;
40
+ async updateCoins ( params : { wallet : IWallet ; addresses : string [ ] ; opts ?: { reprocess ?: boolean } } ) {
41
+ const { wallet, addresses, opts } = params ;
42
42
const { chain, network } = wallet ;
43
+ const { reprocess } = opts || { } ;
43
44
44
45
class AddressInputStream extends Readable {
45
46
addressBatches : string [ ] [ ] ;
@@ -65,19 +66,19 @@ export class WalletAddressModel extends BaseModel<IWalletAddress> {
65
66
}
66
67
async _transform ( addressBatch , _ , callback ) {
67
68
try {
68
- let exists = (
69
- await WalletAddressStorage . collection
70
- . find ( { chain , network , wallet : wallet . _id , address : { $in : addressBatch } } )
71
- . project ( { address : 1 , processed : 1 } )
72
- . toArray ( )
73
- )
74
- . filter ( walletAddress => walletAddress . processed )
75
- . map ( walletAddress => walletAddress . address ) ;
76
- this . push (
77
- addressBatch . filter ( address => {
78
- return ! exists . includes ( address ) ;
79
- } )
80
- ) ;
69
+ if ( reprocess ) {
70
+ this . push ( addressBatch ) ;
71
+ } else {
72
+ const exists = (
73
+ await WalletAddressStorage . collection
74
+ . find ( { chain , network , wallet : wallet . _id , address : { $in : addressBatch } } )
75
+ . project ( { address : 1 , processed : 1 } )
76
+ . toArray ( )
77
+ )
78
+ . filter ( walletAddress => walletAddress . processed )
79
+ . map ( walletAddress => walletAddress . address ) ;
80
+ this . push ( addressBatch . filter ( address => ! exists . includes ( address ) ) ) ;
81
+ }
81
82
callback ( ) ;
82
83
} catch ( err ) {
83
84
callback ( err ) ;
@@ -95,15 +96,13 @@ export class WalletAddressModel extends BaseModel<IWalletAddress> {
95
96
}
96
97
try {
97
98
await WalletAddressStorage . collection . bulkWrite (
98
- addressBatch . map ( address => {
99
- return {
100
- insertOne : {
101
- document : { chain, network, wallet : wallet . _id , address, processed : false }
102
- }
103
- } ;
104
- } )
105
- ) ,
106
- { ordered : false } ;
99
+ addressBatch . map ( address => ( {
100
+ insertOne : {
101
+ document : { chain, network, wallet : wallet . _id , address, processed : false }
102
+ }
103
+ } ) ) ,
104
+ { ordered : false }
105
+ ) ;
107
106
} catch ( err : any ) {
108
107
// Ignore duplicate keys, they may be half processed
109
108
if ( err . code !== 11000 ) {
@@ -125,14 +124,12 @@ export class WalletAddressModel extends BaseModel<IWalletAddress> {
125
124
}
126
125
try {
127
126
await CoinStorage . collection . bulkWrite (
128
- addressBatch . map ( address => {
129
- return {
130
- updateMany : {
131
- filter : { chain, network, address } ,
132
- update : { $addToSet : { wallets : wallet . _id } }
133
- }
134
- } ;
135
- } ) ,
127
+ addressBatch . map ( address => ( {
128
+ updateMany : {
129
+ filter : { chain, network, address } ,
130
+ update : { $addToSet : { wallets : wallet . _id } }
131
+ }
132
+ } ) ) ,
136
133
{ ordered : false }
137
134
) ;
138
135
this . push ( addressBatch ) ;
0 commit comments