Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blobs): blob sink #10079

Merged
merged 4 commits into from
Jan 4, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions yarn-project/blob-sink/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
## Blob Sink

A HTTP api that emulated the https://ethereum.github.io/beacon-APIs/?urls.primaryName=dev#/Beacon/getBlobSidecars API.
A HTTP api that losely emulates the https://ethereum.github.io/beacon-APIs/?urls.primaryName=dev#/Beacon/getBlobSidecars API.
We do not support all of the possible values of block_id, namely `genesis`, `head`, `finalized`. As we are not using any of these values in our
blobs integration.

## When is this used?

This service will run alongside end to end tests to capture the blob transactions that are sent alongside a `propose` transaction.
This service will run alongside end to end tests to capture the blob transactions that are sent alongside a `propose` transaction.

### Why?

Once we make the transition to blob transactions, we will need to be able to query for blobs. One way to do this is to run an entire L1 execution layer and consensus layer pair alongside all of our e2e tests and inside the sandbox. But this is a bit much, so instead the blob sink can be used to store and request blobs, without needing to run an entire consensus layer pair client.

### Other Usecases

Blobs are only held in the L1 consensus layer for a period of ~3 weeks, the blob sink can be used to store blobs for longer.

### How?

The blob sink is a simple HTTP server that can be run alongside the e2e tests. It will store the blobs in a local file system and provide an API to query for them.
115 changes: 79 additions & 36 deletions yarn-project/blob-sink/src/blob-sink.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,39 +19,89 @@ describe('BlobSinkService', () => {
await service.stop();
});

it('should store and retrieve a blob sidecar', async () => {
// Create a test blob
describe('should store and retrieve a blob sidecar', () => {
const testFields = [Fr.random(), Fr.random(), Fr.random()];
const testFields2 = [Fr.random(), Fr.random(), Fr.random()];
const blob = Blob.fromFields(testFields);
const blob2 = Blob.fromFields(testFields2);
const blockId = '0x1234';

// Post the blob
const postResponse = await request(service.getApp())
.post('/blob_sidecar')
.send({
// eslint-disable-next-line camelcase
block_id: blockId,
blobs: [
{
index: 0,
blob: blob.toBuffer(),
},
],
});

expect(postResponse.status).toBe(200);

// Retrieve the blob
const getResponse = await request(service.getApp()).get(`/eth/v1/beacon/blob_sidecars/${blockId}`);

expect(getResponse.status).toBe(200);

// Convert the response blob back to a Blob object and verify it matches
const retrievedBlobs = getResponse.body.data;

const retrievedBlob = Blob.fromBuffer(Buffer.from(retrievedBlobs[0].blob, 'hex'));
expect(retrievedBlob.fieldsHash.toString()).toBe(blob.fieldsHash.toString());
expect(retrievedBlob.commitment.toString('hex')).toBe(blob.commitment.toString('hex'));
beforeEach(async () => {
// Post the blob
const postResponse = await request(service.getApp())
.post('/blob_sidecar')
.send({
// eslint-disable-next-line camelcase
block_id: blockId,
blobs: [
{
index: 0,
blob: blob.toBuffer(),
},
{
index: 1,
blob: blob2.toBuffer(),
},
],
});

expect(postResponse.status).toBe(200);
});

it('should retrieve the blob', async () => {
// Retrieve the blob
const getResponse = await request(service.getApp()).get(`/eth/v1/beacon/blob_sidecars/${blockId}`);

expect(getResponse.status).toBe(200);

// Convert the response blob back to a Blob object and verify it matches
const retrievedBlobs = getResponse.body.data;

const retrievedBlob = Blob.fromBuffer(Buffer.from(retrievedBlobs[0].blob, 'hex'));
const retrievedBlob2 = Blob.fromBuffer(Buffer.from(retrievedBlobs[1].blob, 'hex'));
expect(retrievedBlob.fieldsHash.toString()).toBe(blob.fieldsHash.toString());
expect(retrievedBlob.commitment.toString('hex')).toBe(blob.commitment.toString('hex'));
expect(retrievedBlob2.fieldsHash.toString()).toBe(blob2.fieldsHash.toString());
expect(retrievedBlob2.commitment.toString('hex')).toBe(blob2.commitment.toString('hex'));
});

it('should retrieve specific indicies', async () => {
// We can also request specific indicies
const getWithIndicies = await request(service.getApp()).get(
`/eth/v1/beacon/blob_sidecars/${blockId}?indices=0,1`,
);

expect(getWithIndicies.status).toBe(200);
expect(getWithIndicies.body.data.length).toBe(2);

const retrievedBlobs = getWithIndicies.body.data;
const retrievedBlob = Blob.fromBuffer(Buffer.from(retrievedBlobs[0].blob, 'hex'));
const retrievedBlob2 = Blob.fromBuffer(Buffer.from(retrievedBlobs[1].blob, 'hex'));
expect(retrievedBlob.fieldsHash.toString()).toBe(blob.fieldsHash.toString());
expect(retrievedBlob.commitment.toString('hex')).toBe(blob.commitment.toString('hex'));
expect(retrievedBlob2.fieldsHash.toString()).toBe(blob2.fieldsHash.toString());
expect(retrievedBlob2.commitment.toString('hex')).toBe(blob2.commitment.toString('hex'));
});

it('should retreive a single index', async () => {
const getWithIndicies = await request(service.getApp()).get(`/eth/v1/beacon/blob_sidecars/${blockId}?indices=1`);

expect(getWithIndicies.status).toBe(200);
expect(getWithIndicies.body.data.length).toBe(1);

const retrievedBlobs = getWithIndicies.body.data;
const retrievedBlob = Blob.fromBuffer(Buffer.from(retrievedBlobs[0].blob, 'hex'));
expect(retrievedBlob.fieldsHash.toString()).toBe(blob2.fieldsHash.toString());
expect(retrievedBlob.commitment.toString('hex')).toBe(blob2.commitment.toString('hex'));
});
});

it('should return an error if invalid indicies are provided', async () => {
const blockId = '0x1234';

const response = await request(service.getApp()).get(`/eth/v1/beacon/blob_sidecars/${blockId}?indices=word`);
expect(response.status).toBe(400);
expect(response.body.error).toBe('Invalid indices parameter');
});

it('should return an error if the block ID is invalid (POST)', async () => {
Expand All @@ -75,13 +125,6 @@ describe('BlobSinkService', () => {
expect(response.status).toBe(404);
});

it('should reject invalid block IDs', async () => {
const response = await request(service.getApp()).get('/eth/v1/beacon/blob_sidecars/invalid-id');

expect(response.status).toBe(400);
expect(response.body.error).toBe('Invalid block_id parameter');
});

it('should reject negative block IDs', async () => {
const response = await request(service.getApp()).get('/eth/v1/beacon/blob_sidecars/-123');

Expand Down
50 changes: 49 additions & 1 deletion yarn-project/blob-sink/src/blobstore/blob_store_test_suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export function describeBlobStore(getBlobStore: () => BlobStore) {
// Create a test blob with random fields
const testFields = [Fr.random(), Fr.random(), Fr.random()];
const blob = Blob.fromFields(testFields);
const blockId = '12345';
const blockId = '0x12345';
const blobWithIndex = new BlobWithIndex(blob, 0);

// Store the blob
Expand All @@ -31,6 +31,54 @@ export function describeBlobStore(getBlobStore: () => BlobStore) {
expect(retrievedBlob.blob.commitment.toString('hex')).toBe(blob.commitment.toString('hex'));
});

it('Should allow requesting a specific index of blob', async () => {
const testFields = [Fr.random(), Fr.random(), Fr.random()];
const blob = Blob.fromFields(testFields);
const blockId = '0x12345';
const blobWithIndex = new BlobWithIndex(blob, 0);
const blobWithIndex2 = new BlobWithIndex(blob, 1);

await blobStore.addBlobSidecars(blockId, [blobWithIndex, blobWithIndex2]);

const retrievedBlobs = await blobStore.getBlobSidecars(blockId, [0]);
const [retrievedBlob] = retrievedBlobs!;

expect(retrievedBlob.blob.fieldsHash.toString()).toBe(blob.fieldsHash.toString());
expect(retrievedBlob.blob.commitment.toString('hex')).toBe(blob.commitment.toString('hex'));

const retrievedBlobs2 = await blobStore.getBlobSidecars(blockId, [1]);
const [retrievedBlob2] = retrievedBlobs2!;

expect(retrievedBlob2.blob.fieldsHash.toString()).toBe(blob.fieldsHash.toString());
expect(retrievedBlob2.blob.commitment.toString('hex')).toBe(blob.commitment.toString('hex'));
});

it('Differentiate between blockHash and slot', async () => {
const testFields = [Fr.random(), Fr.random(), Fr.random()];
const testFieldsSlot = [Fr.random(), Fr.random(), Fr.random()];
const blob = Blob.fromFields(testFields);
const blobSlot = Blob.fromFields(testFieldsSlot);
const blockId = '0x12345';
const slot = '12345';
const blobWithIndex = new BlobWithIndex(blob, 0);
const blobWithIndexSlot = new BlobWithIndex(blobSlot, 0);

await blobStore.addBlobSidecars(blockId, [blobWithIndex]);
await blobStore.addBlobSidecars(slot, [blobWithIndexSlot]);

const retrievedBlobs = await blobStore.getBlobSidecars(blockId, [0]);
const [retrievedBlob] = retrievedBlobs!;

expect(retrievedBlob.blob.fieldsHash.toString()).toBe(blob.fieldsHash.toString());
expect(retrievedBlob.blob.commitment.toString('hex')).toBe(blob.commitment.toString('hex'));

const retrievedBlobs2 = await blobStore.getBlobSidecars(slot, [0]);
const [retrievedBlob2] = retrievedBlobs2!;

expect(retrievedBlob2.blob.fieldsHash.toString()).toBe(blobSlot.fieldsHash.toString());
expect(retrievedBlob2.blob.commitment.toString('hex')).toBe(blobSlot.commitment.toString('hex'));
});

it('should return undefined for non-existent blob', async () => {
const nonExistentBlob = await blobStore.getBlobSidecars('999999');
expect(nonExistentBlob).toBeUndefined();
Expand Down
11 changes: 9 additions & 2 deletions yarn-project/blob-sink/src/blobstore/disk_blob_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ export class DiskBlobStore implements BlobStore {
this.blobs = store.openMap('blobs');
}

public getBlobSidecars(blockId: string): Promise<BlobWithIndex[] | undefined> {
public getBlobSidecars(blockId: string, indices?: number[]): Promise<BlobWithIndex[] | undefined> {
const blobBuffer = this.blobs.get(`${blockId}`);
if (!blobBuffer) {
return Promise.resolve(undefined);
}
return Promise.resolve(BlobsWithIndexes.fromBuffer(blobBuffer).blobs);

const blobsWithIndexes = BlobsWithIndexes.fromBuffer(blobBuffer);
if (indices) {
// If indices are provided, return the blobs at the specified indices
return Promise.resolve(blobsWithIndexes.getBlobsFromIndices(indices));
}
// If no indices are provided, return all blobs
return Promise.resolve(blobsWithIndexes.blobs);
}

public async addBlobSidecars(blockId: string, blobSidecars: BlobWithIndex[]): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/blob-sink/src/blobstore/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export interface BlobStore {
/**
* Get a blob by block id
*/
getBlobSidecars: (blockId: string) => Promise<BlobWithIndex[] | undefined>;
getBlobSidecars: (blockId: string, indices?: number[]) => Promise<BlobWithIndex[] | undefined>;
/**
* Add a blob to the store
*/
Expand Down
10 changes: 8 additions & 2 deletions yarn-project/blob-sink/src/blobstore/memory_blob_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ import { type BlobStore } from './interface.js';
export class MemoryBlobStore implements BlobStore {
private blobs: Map<string, Buffer> = new Map();

public getBlobSidecars(blockId: string): Promise<BlobWithIndex[] | undefined> {
public getBlobSidecars(blockId: string, indices?: number[]): Promise<BlobWithIndex[] | undefined> {
const blobBuffer = this.blobs.get(blockId);
if (!blobBuffer) {
return Promise.resolve(undefined);
}
return Promise.resolve(BlobsWithIndexes.fromBuffer(blobBuffer).blobs);
const blobsWithIndexes = BlobsWithIndexes.fromBuffer(blobBuffer);
if (indices) {
// If indices are provided, return the blobs at the specified indices
return Promise.resolve(blobsWithIndexes.getBlobsFromIndices(indices));
}
// If no indices are provided, return all blobs
return Promise.resolve(blobsWithIndexes.blobs);
}

public addBlobSidecars(blockId: string, blobSidecars: BlobWithIndex[]): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/blob-sink/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export class BlobSinkMetrics {
});

this.blobSize = telemetry.getMeter(name).createHistogram(Metrics.BLOB_SINK_BLOB_SIZE, {
description: 'The size of blobs in the blob store',
description: 'The non zero size of blobs in the blob store',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think non-padded would convey it better. Or the size of non-zero elements or something. But guess you can understand so probably fine.

});
}

Expand Down
22 changes: 15 additions & 7 deletions yarn-project/blob-sink/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { type BlobStore, DiskBlobStore } from './blobstore/index.js';
import { MemoryBlobStore } from './blobstore/memory_blob_store.js';
import { type BlobSinkConfig } from './config.js';
import { BlobSinkMetrics } from './metrics.js';
import { type PostBlobSidecarRequest, blockIdSchema } from './types/api.js';
import { type PostBlobSidecarRequest, blockIdSchema, indicesSchema } from './types/api.js';
import { BlobWithIndex } from './types/index.js';

/**
Expand Down Expand Up @@ -47,26 +47,34 @@ export class BlobSinkServer {
}

private setupRoutes() {
this.app.get('/eth/v1/beacon/blob_sidecars/:block_id', this.handleBlobSidecar.bind(this));
this.app.get('/eth/v1/beacon/blob_sidecars/:block_id', this.handleGetBlobSidecar.bind(this));
this.app.post('/blob_sidecar', this.handlePostBlobSidecar.bind(this));
}

private async handleBlobSidecar(req: Request, res: Response) {
private async handleGetBlobSidecar(req: Request, res: Response) {
// eslint-disable-next-line camelcase
const { block_id } = req.params;
const { indices } = req.query;

try {
// eslint-disable-next-line camelcase
const parsedBlockId = blockIdSchema.parse(block_id);

if (!parsedBlockId) {
const parsedBlockId = blockIdSchema.safeParse(block_id);
if (!parsedBlockId.success) {
res.status(400).json({
error: 'Invalid block_id parameter',
});
return;
}

const blobs = await this.blobStore.getBlobSidecars(parsedBlockId.toString());
const parsedIndices = indicesSchema.safeParse(indices);
if (!parsedIndices.success) {
res.status(400).json({
error: 'Invalid indices parameter',
});
return;
}

const blobs = await this.blobStore.getBlobSidecars(parsedBlockId.data.toString(), parsedIndices.data);

if (!blobs) {
res.status(404).json({ error: 'Blob not found' });
Expand Down
20 changes: 18 additions & 2 deletions yarn-project/blob-sink/src/types/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,27 @@ export interface PostBlobSidecarRequest {
}>;
}

// Validation schemas
export const blockIdSchema = z.coerce
export const blockRootSchema = z
.string()
.regex(/^0x[0-9a-fA-F]{0,64}$/)
.max(66);
export const slotSchema = z.number().int().positive();

// Define the Zod schema for an array of numbers
export const indicesSchema = z.optional(
z
.string()
.refine(str => str.split(',').every(item => !isNaN(Number(item))), {
message: 'All items in the query must be valid numbers.',
})
.transform(str => str.split(',').map(Number)),
); // Convert to an array of numbers

// Validation schemas
// Block identifier. Can be one of: <slot>, <hex encoded blockRoot with 0x prefix>.
// Note the spec https://ethereum.github.io/beacon-APIs/?urls.primaryName=dev#/Beacon/getBlobSidecars does allows for "head", "genesis", "finalized" as valid block ids,
// but we explicitly do not support these values.
export const blockIdSchema = blockRootSchema.or(slotSchema);

export const postBlobSidecarSchema = z.object({
// eslint-disable-next-line camelcase
Expand Down
4 changes: 4 additions & 0 deletions yarn-project/blob-sink/src/types/blob_with_index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ export class BlobsWithIndexes {
const reader = BufferReader.asReader(buffer);
return new BlobsWithIndexes(reader.readArray(reader.readNumber(), BlobWithIndex));
}

public getBlobsFromIndices(indices: number[]): BlobWithIndex[] {
return this.blobs.filter((_, index) => indices.includes(index));
}
}

/** We store blobs alongside their index in the block */
Expand Down
2 changes: 1 addition & 1 deletion yarn-project/end-to-end/src/fixtures/snapshot_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ async function setupFromFresh(
} else {
aztecNodeConfig.dataDirectory = statePath;
}
aztecNodeConfig.blobSinkUrl = `http://127.0.0.1:${blobSinkPort}`;
aztecNodeConfig.blobSinkUrl = `http://localhost:${blobSinkPort}`;

// Setup blob sink service
const blobSink = await createBlobSinkServer({
Expand Down
Loading