Skip to content

Commit ba98e8e

Browse files
committed
fix: detect block gaps when streaming from ordhook (#349)
* fix: detect gaps when streaming * fix: tests
1 parent 8db749e commit ba98e8e

File tree

6 files changed

+192
-13
lines changed

6 files changed

+192
-13
lines changed

src/pg/block-cache.ts

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export class BlockCache {
2828
locations: DbLocationInsert[] = [];
2929
currentLocations = new Map<string, DbCurrentLocationInsert>();
3030
recursiveRefs = new Map<string, string[]>();
31+
revealedNumbers: number[] = [];
3132

3233
mimeTypeCounts = new Map<string, number>();
3334
satRarityCounts = new Map<string, number>();
@@ -72,6 +73,7 @@ export class BlockCache {
7273
parent: reveal.parent,
7374
timestamp: this.timestamp,
7475
});
76+
this.revealedNumbers.push(reveal.inscription_number.jubilee);
7577
this.increaseMimeTypeCount(mime_type);
7678
this.increaseSatRarityCount(satoshi.rarity);
7779
this.increaseInscriptionTypeCount(reveal.inscription_number.classic < 0 ? 'cursed' : 'blessed');

src/pg/pg-store.ts

+52-12
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ import {
88
runMigrations,
99
stopwatch,
1010
} from '@hirosystems/api-toolkit';
11-
import { BitcoinEvent, BitcoinPayload } from '@hirosystems/chainhook-client';
11+
import {
12+
BadPayloadRequestError,
13+
BitcoinEvent,
14+
BitcoinPayload,
15+
} from '@hirosystems/chainhook-client';
1216
import * as path from 'path';
1317
import * as postgres from 'postgres';
1418
import { Order, OrderBy } from '../api/schemas';
@@ -35,6 +39,8 @@ export const INSERT_BATCH_SIZE = 4000;
3539

3640
type InscriptionIdentifier = { genesis_id: string } | { number: number };
3741

42+
class BlockAlreadyIngestedError extends Error {}
43+
3844
export class PgStore extends BasePgStore {
3945
readonly brc20: Brc20PgStore;
4046
readonly counts: CountsPgStore;
@@ -90,14 +96,17 @@ export class PgStore extends BasePgStore {
9096
);
9197
}
9298
for (const event of payload.apply) {
93-
if (await this.isBlockIngested(event)) {
94-
logger.warn(`PgStore skipping previously seen block ${event.block_identifier.index}`);
95-
continue;
96-
}
9799
logger.info(`PgStore apply block ${event.block_identifier.index}`);
98100
const time = stopwatch();
99-
await this.updateInscriptionsEvent(sql, event, 'apply', streamed);
100-
await this.brc20.updateBrc20Operations(sql, event, 'apply');
101+
try {
102+
await this.updateInscriptionsEvent(sql, event, 'apply', streamed);
103+
await this.brc20.updateBrc20Operations(sql, event, 'apply');
104+
} catch (error) {
105+
if (error instanceof BlockAlreadyIngestedError) {
106+
logger.warn(error);
107+
continue;
108+
} else throw error;
109+
}
101110
await this.updateChainTipBlockHeight(sql, event.block_identifier.index);
102111
logger.info(
103112
`PgStore apply block ${
@@ -119,6 +128,7 @@ export class PgStore extends BasePgStore {
119128
normalizedHexString(event.block_identifier.hash),
120129
event.timestamp
121130
);
131+
if (direction === 'apply') await this.assertNextBlockIsNotIngested(sql, event);
122132
for (const tx of event.transactions) {
123133
const tx_id = normalizedHexString(tx.transaction_identifier.hash);
124134
for (const operation of tx.metadata.ordinal_operations) {
@@ -138,6 +148,7 @@ export class PgStore extends BasePgStore {
138148
}
139149
switch (direction) {
140150
case 'apply':
151+
if (streamed) await this.assertNextBlockIsContiguous(sql, event, cache);
141152
await this.applyInscriptions(sql, cache, streamed);
142153
break;
143154
case 'rollback':
@@ -348,15 +359,44 @@ export class PgStore extends BasePgStore {
348359
}
349360
}
350361

351-
private async isBlockIngested(event: BitcoinEvent): Promise<boolean> {
352-
const currentBlockHeight = await this.getChainTipBlockHeight();
362+
private async assertNextBlockIsNotIngested(sql: PgSqlClient, event: BitcoinEvent) {
363+
const result = await sql<{ block_height: number }[]>`
364+
SELECT block_height::int FROM chain_tip
365+
`;
366+
if (!result.count) return false;
367+
const currentHeight = result[0].block_height;
353368
if (
354-
event.block_identifier.index <= currentBlockHeight &&
369+
event.block_identifier.index <= currentHeight &&
355370
event.block_identifier.index !== ORDINALS_GENESIS_BLOCK
356371
) {
357-
return true;
372+
throw new BlockAlreadyIngestedError(
373+
`Block ${event.block_identifier.index} is already ingested, chain tip is at ${currentHeight}`
374+
);
375+
}
376+
}
377+
378+
private async assertNextBlockIsContiguous(
379+
sql: PgSqlClient,
380+
event: BitcoinEvent,
381+
cache: BlockCache
382+
) {
383+
if (!cache.revealedNumbers.length) {
384+
// TODO: How do we check blocks with only transfers?
385+
return;
358386
}
359-
return false;
387+
const result = await sql<{ max: number | null; block_height: number }[]>`
388+
WITH tip AS (SELECT block_height::int FROM chain_tip)
389+
SELECT MAX(number)::int AS max, (SELECT block_height FROM tip)
390+
FROM inscriptions WHERE number >= 0
391+
`;
392+
if (!result.count) return;
393+
const data = result[0];
394+
const firstReveal = cache.revealedNumbers.sort()[0];
395+
if (data.max === null && firstReveal === 0) return;
396+
if ((data.max ?? 0) + 1 != firstReveal)
397+
throw new BadPayloadRequestError(
398+
`Streamed block ${event.block_identifier.index} is non-contiguous, attempting to reveal #${firstReveal} when current max is #${data.max} at block height ${data.block_height}`
399+
);
360400
}
361401

362402
private async updateChainTipBlockHeight(sql: PgSqlClient, block_height: number): Promise<void> {

tests/api/cache.test.ts

+10
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ describe('ETag cache', () => {
2121

2222
test('inscription cache control', async () => {
2323
const block = new TestChainhookPayloadBuilder()
24+
.streamingBlocks(true)
2425
.apply()
2526
.block({ height: 775617 })
2627
.transaction({ hash: '0x38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc' })
@@ -88,6 +89,7 @@ describe('ETag cache', () => {
8889
// Perform transfer and check cache
8990
await db.updateInscriptions(
9091
new TestChainhookPayloadBuilder()
92+
.streamingBlocks(true)
9193
.apply()
9294
.block({ height: 775618, timestamp: 1678122360 })
9395
.transaction({
@@ -125,6 +127,7 @@ describe('ETag cache', () => {
125127
// Perform transfer GAP FILL and check cache
126128
await db.updateInscriptions(
127129
new TestChainhookPayloadBuilder()
130+
.streamingBlocks(true)
128131
.apply()
129132
.block({ height: 775619, timestamp: 1678122360 })
130133
.transaction({
@@ -161,6 +164,7 @@ describe('ETag cache', () => {
161164

162165
test('inscriptions index cache control', async () => {
163166
const block1 = new TestChainhookPayloadBuilder()
167+
.streamingBlocks(true)
164168
.apply()
165169
.block({ height: 778575 })
166170
.transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' })
@@ -194,6 +198,7 @@ describe('ETag cache', () => {
194198
.build();
195199
await db.updateInscriptions(block1);
196200
const block2 = new TestChainhookPayloadBuilder()
201+
.streamingBlocks(true)
197202
.apply()
198203
.block({ height: 778576 })
199204
.transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' })
@@ -246,6 +251,7 @@ describe('ETag cache', () => {
246251

247252
// New location
248253
const block3 = new TestChainhookPayloadBuilder()
254+
.streamingBlocks(true)
249255
.apply()
250256
.block({ height: 778577 })
251257
.transaction({ hash: 'ae9d273a10e899f0d2cad47ee2b0e77ab8a9addd9dd5bb5e4b03d6971c060d52' })
@@ -274,6 +280,7 @@ describe('ETag cache', () => {
274280

275281
test('inscriptions stats per block cache control', async () => {
276282
const block1 = new TestChainhookPayloadBuilder()
283+
.streamingBlocks(true)
277284
.apply()
278285
.block({ height: 778575, hash: randomHash() })
279286
.transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' })
@@ -326,6 +333,7 @@ describe('ETag cache', () => {
326333

327334
// New block
328335
const block2 = new TestChainhookPayloadBuilder()
336+
.streamingBlocks(true)
329337
.apply()
330338
.block({ height: 778576, hash: randomHash() })
331339
.transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' })
@@ -370,6 +378,7 @@ describe('ETag cache', () => {
370378

371379
test('status etag changes with new block', async () => {
372380
const block1 = new TestChainhookPayloadBuilder()
381+
.streamingBlocks(true)
373382
.apply()
374383
.block({ height: 778575, hash: randomHash() })
375384
.transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' })
@@ -422,6 +431,7 @@ describe('ETag cache', () => {
422431

423432
// New block
424433
const block2 = new TestChainhookPayloadBuilder()
434+
.streamingBlocks(true)
425435
.apply()
426436
.block({ height: 778576, hash: randomHash() })
427437
.transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' })

tests/helpers.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export class TestChainhookPayloadBuilder {
3030
operation: 'inscription_feed',
3131
meta_protocols: ['brc-20'],
3232
},
33-
is_streaming_blocks: true,
33+
is_streaming_blocks: false,
3434
},
3535
};
3636
private action: 'apply' | 'rollback' = 'apply';

tests/ordhook/replay.test.ts

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ describe('Replay', () => {
2222

2323
test('shuts down when streaming on replay mode', async () => {
2424
const payload1 = new TestChainhookPayloadBuilder()
25+
.streamingBlocks(true)
2526
.apply()
2627
.block({
2728
height: 767430,

tests/ordhook/server.test.ts

+126
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,132 @@ describe('EventServer', () => {
572572
});
573573

574574
describe('gap detection', () => {
575+
test('server rejects payload with first inscription gap when streaming', async () => {
576+
await db.updateInscriptions(
577+
new TestChainhookPayloadBuilder()
578+
.streamingBlocks(false)
579+
.apply()
580+
.block({
581+
height: 778575,
582+
hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d',
583+
timestamp: 1676913207,
584+
})
585+
.transaction({
586+
hash: '9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201',
587+
})
588+
.inscriptionRevealed({
589+
content_bytes: '0x48656C6C6F',
590+
content_type: 'text/plain;charset=utf-8',
591+
content_length: 5,
592+
inscription_number: { classic: 0, jubilee: 0 },
593+
inscription_fee: 705,
594+
inscription_id: '9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201i0',
595+
inscription_output_value: 10000,
596+
inscriber_address: 'bc1pscktlmn99gyzlvymvrezh6vwd0l4kg06tg5rvssw0czg8873gz5sdkteqj',
597+
ordinal_number: 257418248345364,
598+
ordinal_block_height: 650000,
599+
ordinal_offset: 0,
600+
satpoint_post_inscription:
601+
'9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201:0:0',
602+
inscription_input_index: 0,
603+
transfers_pre_inscription: 0,
604+
tx_index: 0,
605+
curse_type: null,
606+
inscription_pointer: null,
607+
delegate: null,
608+
metaprotocol: null,
609+
metadata: null,
610+
parent: null,
611+
})
612+
.build()
613+
);
614+
const errorPayload1 = new TestChainhookPayloadBuilder()
615+
.streamingBlocks(false)
616+
.apply()
617+
.block({
618+
height: 778576,
619+
hash: '00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d',
620+
timestamp: 1676913207,
621+
})
622+
.transaction({
623+
hash: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc',
624+
})
625+
.inscriptionRevealed({
626+
content_bytes: '0x48656C6C6F',
627+
content_type: 'text/plain;charset=utf-8',
628+
content_length: 5,
629+
inscription_number: { classic: 5, jubilee: 5 }, // Gap at 5 but block is not streamed
630+
inscription_fee: 705,
631+
inscription_id: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dci0',
632+
inscription_output_value: 10000,
633+
inscriber_address: 'bc1p3cyx5e2hgh53w7kpxcvm8s4kkega9gv5wfw7c4qxsvxl0u8x834qf0u2td',
634+
ordinal_number: 1050000000000000,
635+
ordinal_block_height: 650000,
636+
ordinal_offset: 0,
637+
satpoint_post_inscription:
638+
'38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc:0:0',
639+
inscription_input_index: 0,
640+
transfers_pre_inscription: 0,
641+
tx_index: 0,
642+
curse_type: null,
643+
inscription_pointer: null,
644+
delegate: null,
645+
metaprotocol: null,
646+
metadata: null,
647+
parent: null,
648+
})
649+
.build();
650+
// Not streamed, accepts block.
651+
await expect(db.updateInscriptions(errorPayload1)).resolves.not.toThrow(
652+
BadPayloadRequestError
653+
);
654+
655+
const errorPayload2 = new TestChainhookPayloadBuilder()
656+
.streamingBlocks(true)
657+
.apply()
658+
.block({
659+
height: 778579,
660+
hash: '00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d',
661+
timestamp: 1676913207,
662+
})
663+
.transaction({
664+
hash: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc',
665+
})
666+
.inscriptionRevealed({
667+
content_bytes: '0x48656C6C6F',
668+
content_type: 'text/plain;charset=utf-8',
669+
content_length: 5,
670+
inscription_number: { classic: 10, jubilee: 10 }, // Gap at 10
671+
inscription_fee: 705,
672+
inscription_id: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dci0',
673+
inscription_output_value: 10000,
674+
inscriber_address: 'bc1p3cyx5e2hgh53w7kpxcvm8s4kkega9gv5wfw7c4qxsvxl0u8x834qf0u2td',
675+
ordinal_number: 1050000000000000,
676+
ordinal_block_height: 650000,
677+
ordinal_offset: 0,
678+
satpoint_post_inscription:
679+
'38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc:0:0',
680+
inscription_input_index: 0,
681+
transfers_pre_inscription: 0,
682+
tx_index: 0,
683+
curse_type: null,
684+
inscription_pointer: null,
685+
delegate: null,
686+
metaprotocol: null,
687+
metadata: null,
688+
parent: null,
689+
})
690+
.build();
691+
await expect(db.updateInscriptions(errorPayload2)).rejects.toThrow(BadPayloadRequestError);
692+
const response = await server['fastify'].inject({
693+
method: 'POST',
694+
url: `/payload`,
695+
headers: { authorization: `Bearer ${ENV.ORDHOOK_NODE_AUTH_TOKEN}` },
696+
payload: errorPayload2,
697+
});
698+
expect(response.statusCode).toBe(400);
699+
});
700+
575701
test('server ignores past blocks', async () => {
576702
const payload = new TestChainhookPayloadBuilder()
577703
.apply()

0 commit comments

Comments
 (0)