@@ -20,18 +20,18 @@ import {
20
20
import { unixToDate } from '../../../../utils/convert' ;
21
21
import { StatsUtil } from '../../../../utils/stats' ;
22
22
import MoralisAPI from '../../external/providers/moralis' ;
23
- import { ExternalApiStream , MergedStream , ParseTransform } from '../../external/streams/apiStream' ;
23
+ import { ExternalApiStream , MergedStream , ParseStream } from '../../external/streams/apiStream' ;
24
24
import { NodeQueryStream } from '../../external/streams/nodeStream' ;
25
25
import { InternalStateProvider } from '../../internal/internal' ;
26
26
import { EVMTransactionStorage } from '../models/transaction' ;
27
27
import { EVMTransactionJSON } from '../types' ;
28
28
import { BaseEVMStateProvider } from './csp' ;
29
+ import { PopulateReceiptTransform } from './populateReceiptTransform' ;
29
30
import {
30
31
getProvider ,
31
32
isValidProviderType
32
33
} from './provider' ;
33
34
import { EVMListTransactionsStream } from './transform' ;
34
- import { PopulateReceiptTransform } from './populateReceiptTransform' ;
35
35
36
36
37
37
export interface GetWeb3Response { rpc : CryptoRpc ; web3 : Web3 ; dataType : string }
@@ -268,16 +268,10 @@ export class BaseEVMExternalStateProvider extends InternalStateProvider implemen
268
268
const txStreams : Readable [ ] = [ ] ;
269
269
const ethTransactionTransform = new EVMListTransactionsStream ( walletAddresses ) ;
270
270
const populateReceipt = new PopulateReceiptTransform ( ) ;
271
- const parseStrings = new ParseTransform ( ) ;
271
+ const parseStream = new ParseStream ( ) ;
272
272
const mergedStream = new MergedStream ( ) ; // Stream to combine the output of multiple streams
273
273
const resultStream = new MergedStream ( ) ; // Stream to write to the res object
274
274
275
- // Transform transactions proccessed through merged stream
276
- mergedStream
277
- . pipe ( parseStrings )
278
- . pipe ( populateReceipt )
279
- . pipe ( ethTransactionTransform )
280
- . pipe ( resultStream ) ;
281
275
// Tip height used to calculate confirmations
282
276
args . tipHeight = tip ? tip . height : 0 ;
283
277
// Defaults to pulling only the first 10 transactions per address
@@ -287,7 +281,13 @@ export class BaseEVMExternalStateProvider extends InternalStateProvider implemen
287
281
}
288
282
// Pipe all txStreams to the mergedStream
289
283
ExternalApiStream . mergeStreams ( txStreams , mergedStream ) ;
290
- // Ensure mergeStream resolves
284
+ // Transform transactions proccessed through merged stream
285
+ mergedStream
286
+ . pipe ( parseStream )
287
+ . pipe ( populateReceipt )
288
+ . pipe ( ethTransactionTransform )
289
+ . pipe ( resultStream ) ;
290
+ // Ensure resultStream resolves
291
291
const result = await ExternalApiStream . onStream ( resultStream , req ! , res ! , { jsonl : true } ) ;
292
292
if ( result ?. success === false ) {
293
293
logger . error ( 'Error mid-stream (streamWalletTransactions): %o' , result . error ?. log || result . error ) ;
0 commit comments