Skip to content

Commit f0ed660

Browse files
committed
transform wallet transactions
1 parent 800ae8f commit f0ed660

File tree

2 files changed

+41
-13
lines changed

2 files changed

+41
-13
lines changed

packages/bitcore-node/src/providers/chain-state/evm/api/ecsp.ts

+20-10
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
import { unixToDate } from '../../../../utils/convert';
2121
import { StatsUtil } from '../../../../utils/stats';
2222
import MoralisAPI from '../../external/providers/moralis';
23-
import { ExternalApiStream, MergedStream } from '../../external/streams/apiStream';
23+
import { ExternalApiStream, MergedStream, ParseTransform } from '../../external/streams/apiStream';
2424
import { NodeQueryStream } from '../../external/streams/nodeStream';
2525
import { InternalStateProvider } from '../../internal/internal';
2626
import { EVMTransactionStorage } from '../models/transaction';
@@ -30,6 +30,8 @@ import {
3030
getProvider,
3131
isValidProviderType
3232
} from './provider';
33+
import { EVMListTransactionsStream } from './transform';
34+
import { PopulateReceiptTransform } from './populateReceiptTransform';
3335

3436

3537
export interface GetWeb3Response { rpc: CryptoRpc; web3: Web3; dataType: string }
@@ -261,24 +263,32 @@ export class BaseEVMExternalStateProvider extends InternalStateProvider implemen
261263
throw new Error('Missing wallet');
262264
}
263265
const chainId = await this.getChainId({ network });
264-
// Calculate confirmations with tip height
265266
const tip = await this.getLocalTip(params);
266-
args.tipHeight = tip ? tip.height : 0;
267-
const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(addy => addy.address);
268-
const mergedStream = new MergedStream();
267+
const walletAddresses = (await this.getWalletAddresses(wallet._id!)).map(addy => addy.address.toLowerCase());
269268
const txStreams: Readable[] = [];
270-
// Only mergedStream writes to res object
271-
const _mergedStream = ExternalApiStream.onStream(mergedStream, req!, res!, { jsonl: true });
272-
273-
// Default to pulling only the first 10 transactions per address
269+
const ethTransactionTransform = new EVMListTransactionsStream(walletAddresses);
270+
const populateReceipt = new PopulateReceiptTransform();
271+
const parseStrings = new ParseTransform();
272+
const mergedStream = new MergedStream(); // Stream to combine the output of multiple streams
273+
const resultStream = new MergedStream(); // Stream to write to the res object
274+
275+
// Transform transactions proccessed through merged stream
276+
mergedStream
277+
.pipe(parseStrings)
278+
.pipe(populateReceipt)
279+
.pipe(ethTransactionTransform)
280+
.pipe(resultStream);
281+
// Tip height used to calculate confirmations
282+
args.tipHeight = tip ? tip.height : 0;
283+
// Defaults to pulling only the first 10 transactions per address
274284
for (let i = 0; i < walletAddresses.length; i++) {
275285
// args / query params are processed at the api provider level
276286
txStreams.push(MoralisAPI.streamTransactionsByAddress({ chainId, chain, network, address: walletAddresses[i], args }));
277287
}
278288
// Pipe all txStreams to the mergedStream
279289
ExternalApiStream.mergeStreams(txStreams, mergedStream);
280290
// Ensure mergeStream resolves
281-
const result = await _mergedStream;
291+
const result = await ExternalApiStream.onStream(resultStream, req!, res!, { jsonl: true });
282292
if (result?.success === false) {
283293
logger.error('Error mid-stream (streamWalletTransactions): %o', result.error?.log || result.error);
284294
}

packages/bitcore-node/src/providers/chain-state/external/streams/apiStream.ts

+21-3
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,12 @@ export class ExternalApiStream extends Readable {
103103
}
104104
}
105105
if (!isFirst) {
106-
res.write(',\n{"error": "An error occurred during data stream"}\n]');
106+
const errMsg = '{"error": "An error occurred during data stream"}';
107+
if (opts.jsonl) {
108+
res.write(`${errMsg}`);
109+
} else {
110+
res.write(`,\n${errMsg}\n]`);
111+
}
107112
res.end();
108113
res.destroy();
109114
return resolve({ success: false, error: err });
@@ -118,7 +123,7 @@ export class ExternalApiStream extends Readable {
118123
if (opts.jsonl) {
119124
if (isFirst) {
120125
isFirst = false;
121-
} else {
126+
} else if (!data.endsWith('\n')) {
122127
res.write('\n');
123128
}
124129
} else {
@@ -142,7 +147,7 @@ export class ExternalApiStream extends Readable {
142147
// there was no data
143148
res.write('[]');
144149
} else {
145-
res.write('\n]');
150+
res.write('\n]');
146151
}
147152
}
148153
res.end();
@@ -170,6 +175,19 @@ export class ExternalApiStream extends Readable {
170175
}
171176
}
172177

178+
export class ParseTransform extends Transform {
179+
constructor() {
180+
super({ objectMode: true });
181+
}
182+
183+
async _transform(data: any, _, done) {
184+
if (typeof data === 'string') {
185+
data = JSON.parse(data);
186+
}
187+
done(null, data);
188+
}
189+
}
190+
173191
export class MergedStream extends Transform {
174192
constructor() {
175193
super({ objectMode: true });

0 commit comments

Comments
 (0)