Skip to content

Commit ecc037f

Browse files
authored
fix: memory leak in the broker (#10567)
This PR fixes a memory leak in the prover broker by cleaning up jobs after their result is saved by the orchestrator. The orchestrator then does clean up on its own after the epoch is finished.
1 parent 09e95a1 commit ecc037f

11 files changed

+483
-174
lines changed

yarn-project/circuit-types/src/interfaces/prover-broker.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,16 @@ export interface ProvingJobProducer {
5656
enqueueProvingJob(job: ProvingJob): Promise<void>;
5757

5858
/**
59-
* Cancels a proving job and clears all of its
59+
* Cancels a proving job.
6060
* @param id - The ID of the job to cancel
6161
*/
62-
removeAndCancelProvingJob(id: ProvingJobId): Promise<void>;
62+
cancelProvingJob(id: ProvingJobId): Promise<void>;
63+
64+
/**
65+
* Cleans up after a job has completed. Throws if the job is in-progress
66+
* @param id - The ID of the job to cancel
67+
*/
68+
cleanUpProvingJobState(id: ProvingJobId): Promise<void>;
6369

6470
/**
6571
* Returns the current status fof the proving job

yarn-project/circuit-types/src/interfaces/proving-job-source.test.ts

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class MockProvingJobSource implements ProvingJobSource {
7070
id: 'a-job-id',
7171
type: ProvingRequestType.PRIVATE_BASE_ROLLUP,
7272
inputsUri: 'inputs-uri' as ProofUri,
73+
epochNumber: 1,
7374
});
7475
}
7576
heartbeat(jobId: string): Promise<void> {

yarn-project/circuit-types/src/interfaces/proving-job.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ export type ProvingJobId = z.infer<typeof ProvingJobId>;
241241
export const ProvingJob = z.object({
242242
id: ProvingJobId,
243243
type: z.nativeEnum(ProvingRequestType),
244-
blockNumber: z.number().optional(),
244+
epochNumber: z.number(),
245245
inputsUri: ProofUri,
246246
});
247247

yarn-project/prover-client/src/prover-agent/memory-proving-queue.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource
120120
id: job.id,
121121
type: job.type,
122122
inputsUri: job.inputsUri,
123+
epochNumber: job.epochNumber,
123124
};
124125
} catch (err) {
125126
if (err instanceof TimeoutError) {
@@ -244,7 +245,7 @@ export class MemoryProvingQueue implements ServerCircuitProver, ProvingJobSource
244245
reject,
245246
attempts: 1,
246247
heartbeat: 0,
247-
epochNumber,
248+
epochNumber: epochNumber ?? 0,
248249
};
249250

250251
if (signal) {

yarn-project/prover-client/src/proving_broker/caching_broker_facade.test.ts

+53-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ describe('CachingBrokerFacade', () => {
2424
broker = mock<ProvingJobProducer>({
2525
enqueueProvingJob: jest.fn<any>(),
2626
getProvingJobStatus: jest.fn<any>(),
27-
removeAndCancelProvingJob: jest.fn<any>(),
27+
cancelProvingJob: jest.fn<any>(),
28+
cleanUpProvingJobState: jest.fn<any>(),
2829
waitForJobToSettle: jest.fn<any>(),
2930
});
3031
cache = new InMemoryProverCache();
@@ -101,4 +102,55 @@ describe('CachingBrokerFacade', () => {
101102
await expect(facade.getBaseParityProof(inputs)).resolves.toEqual(result);
102103
expect(broker.enqueueProvingJob).toHaveBeenCalledTimes(1); // job was only ever enqueued once
103104
});
105+
106+
it('clears broker state after a job resolves', async () => {
107+
const { promise, resolve } = promiseWithResolvers<any>();
108+
broker.enqueueProvingJob.mockResolvedValue(Promise.resolve());
109+
broker.waitForJobToSettle.mockResolvedValue(promise);
110+
111+
const inputs = makeBaseParityInputs();
112+
void facade.getBaseParityProof(inputs);
113+
await jest.advanceTimersToNextTimerAsync();
114+
115+
const job = broker.enqueueProvingJob.mock.calls[0][0];
116+
const result = makePublicInputsAndRecursiveProof(
117+
makeParityPublicInputs(),
118+
makeRecursiveProof(RECURSIVE_PROOF_LENGTH),
119+
VerificationKeyData.makeFakeHonk(),
120+
);
121+
const outputUri = await proofStore.saveProofOutput(job.id, ProvingRequestType.BASE_PARITY, result);
122+
resolve({
123+
status: 'fulfilled',
124+
value: outputUri,
125+
});
126+
127+
await jest.advanceTimersToNextTimerAsync();
128+
expect(broker.cleanUpProvingJobState).toHaveBeenCalled();
129+
});
130+
131+
it('clears broker state after a job is canceled', async () => {
132+
const { promise, resolve } = promiseWithResolvers<any>();
133+
const catchSpy = jest.fn();
134+
broker.enqueueProvingJob.mockResolvedValue(Promise.resolve());
135+
broker.waitForJobToSettle.mockResolvedValue(promise);
136+
137+
const inputs = makeBaseParityInputs();
138+
const controller = new AbortController();
139+
void facade.getBaseParityProof(inputs, controller.signal).catch(catchSpy);
140+
await jest.advanceTimersToNextTimerAsync();
141+
142+
expect(broker.cancelProvingJob).not.toHaveBeenCalled();
143+
controller.abort();
144+
await jest.advanceTimersToNextTimerAsync();
145+
expect(broker.cancelProvingJob).toHaveBeenCalled();
146+
147+
resolve({
148+
status: 'rejected',
149+
reason: 'Aborted',
150+
});
151+
152+
await jest.advanceTimersToNextTimerAsync();
153+
expect(broker.cleanUpProvingJobState).toHaveBeenCalled();
154+
expect(catchSpy).toHaveBeenCalledWith(new Error('Aborted'));
155+
});
104156
});

yarn-project/prover-client/src/proving_broker/caching_broker_facade.ts

+29-13
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ export class CachingBrokerFacade implements ServerCircuitProver {
5959
id: ProvingJobId,
6060
type: T,
6161
inputs: ProvingJobInputsMap[T],
62+
epochNumber = 0,
6263
signal?: AbortSignal,
6364
): Promise<ProvingJobResultsMap[T]> {
6465
// first try the cache
@@ -95,6 +96,7 @@ export class CachingBrokerFacade implements ServerCircuitProver {
9596
id,
9697
type,
9798
inputsUri,
99+
epochNumber,
98100
});
99101
await this.cache.setProvingJobStatus(id, { status: 'in-queue' });
100102
} catch (err) {
@@ -107,7 +109,7 @@ export class CachingBrokerFacade implements ServerCircuitProver {
107109
// notify broker of cancelled job
108110
const abortFn = async () => {
109111
signal?.removeEventListener('abort', abortFn);
110-
await this.broker.removeAndCancelProvingJob(id);
112+
await this.broker.cancelProvingJob(id);
111113
};
112114

113115
signal?.addEventListener('abort', abortFn);
@@ -147,160 +149,174 @@ export class CachingBrokerFacade implements ServerCircuitProver {
147149
}
148150
} finally {
149151
signal?.removeEventListener('abort', abortFn);
152+
// we've saved the result in our cache. We can tell the broker to clear its state
153+
await this.broker.cleanUpProvingJobState(id);
150154
}
151155
}
152156

153157
getAvmProof(
154158
inputs: AvmCircuitInputs,
155159
signal?: AbortSignal,
156-
_blockNumber?: number,
160+
epochNumber?: number,
157161
): Promise<ProofAndVerificationKey<typeof AVM_PROOF_LENGTH_IN_FIELDS>> {
158162
return this.enqueueAndWaitForJob(
159163
this.generateId(ProvingRequestType.PUBLIC_VM, inputs),
160164
ProvingRequestType.PUBLIC_VM,
161165
inputs,
166+
epochNumber,
162167
signal,
163168
);
164169
}
165170

166171
getBaseParityProof(
167172
inputs: BaseParityInputs,
168173
signal?: AbortSignal,
169-
_epochNumber?: number,
174+
epochNumber?: number,
170175
): Promise<PublicInputsAndRecursiveProof<ParityPublicInputs, typeof RECURSIVE_PROOF_LENGTH>> {
171176
return this.enqueueAndWaitForJob(
172177
this.generateId(ProvingRequestType.BASE_PARITY, inputs),
173178
ProvingRequestType.BASE_PARITY,
174179
inputs,
180+
epochNumber,
175181
signal,
176182
);
177183
}
178184

179185
getBlockMergeRollupProof(
180186
input: BlockMergeRollupInputs,
181187
signal?: AbortSignal,
182-
_epochNumber?: number,
188+
epochNumber?: number,
183189
): Promise<PublicInputsAndRecursiveProof<BlockRootOrBlockMergePublicInputs, typeof RECURSIVE_PROOF_LENGTH>> {
184190
return this.enqueueAndWaitForJob(
185191
this.generateId(ProvingRequestType.BLOCK_MERGE_ROLLUP, input),
186192
ProvingRequestType.BLOCK_MERGE_ROLLUP,
187193
input,
194+
epochNumber,
188195
signal,
189196
);
190197
}
191198

192199
getBlockRootRollupProof(
193200
input: BlockRootRollupInputs,
194201
signal?: AbortSignal,
195-
_epochNumber?: number,
202+
epochNumber?: number,
196203
): Promise<PublicInputsAndRecursiveProof<BlockRootOrBlockMergePublicInputs, typeof RECURSIVE_PROOF_LENGTH>> {
197204
return this.enqueueAndWaitForJob(
198205
this.generateId(ProvingRequestType.BLOCK_ROOT_ROLLUP, input),
199206
ProvingRequestType.BLOCK_ROOT_ROLLUP,
200207
input,
208+
epochNumber,
201209
signal,
202210
);
203211
}
204212

205213
getEmptyBlockRootRollupProof(
206214
input: EmptyBlockRootRollupInputs,
207215
signal?: AbortSignal,
208-
_epochNumber?: number,
216+
epochNumber?: number,
209217
): Promise<PublicInputsAndRecursiveProof<BlockRootOrBlockMergePublicInputs>> {
210218
return this.enqueueAndWaitForJob(
211219
this.generateId(ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP, input),
212220
ProvingRequestType.EMPTY_BLOCK_ROOT_ROLLUP,
213221
input,
222+
epochNumber,
214223
signal,
215224
);
216225
}
217226

218227
getEmptyPrivateKernelProof(
219228
inputs: PrivateKernelEmptyInputData,
220229
signal?: AbortSignal,
221-
_epochNumber?: number,
230+
epochNumber?: number,
222231
): Promise<PublicInputsAndRecursiveProof<KernelCircuitPublicInputs, typeof RECURSIVE_PROOF_LENGTH>> {
223232
return this.enqueueAndWaitForJob(
224233
this.generateId(ProvingRequestType.PRIVATE_KERNEL_EMPTY, inputs),
225234
ProvingRequestType.PRIVATE_KERNEL_EMPTY,
226235
inputs,
236+
epochNumber,
227237
signal,
228238
);
229239
}
230240

231241
getMergeRollupProof(
232242
input: MergeRollupInputs,
233243
signal?: AbortSignal,
234-
_epochNumber?: number,
244+
epochNumber?: number,
235245
): Promise<PublicInputsAndRecursiveProof<BaseOrMergeRollupPublicInputs, typeof RECURSIVE_PROOF_LENGTH>> {
236246
return this.enqueueAndWaitForJob(
237247
this.generateId(ProvingRequestType.MERGE_ROLLUP, input),
238248
ProvingRequestType.MERGE_ROLLUP,
239249
input,
250+
epochNumber,
240251
signal,
241252
);
242253
}
243254
getPrivateBaseRollupProof(
244255
baseRollupInput: PrivateBaseRollupInputs,
245256
signal?: AbortSignal,
246-
_epochNumber?: number,
257+
epochNumber?: number,
247258
): Promise<PublicInputsAndRecursiveProof<BaseOrMergeRollupPublicInputs, typeof RECURSIVE_PROOF_LENGTH>> {
248259
return this.enqueueAndWaitForJob(
249260
this.generateId(ProvingRequestType.PRIVATE_BASE_ROLLUP, baseRollupInput),
250261
ProvingRequestType.PRIVATE_BASE_ROLLUP,
251262
baseRollupInput,
263+
epochNumber,
252264
signal,
253265
);
254266
}
255267

256268
getPublicBaseRollupProof(
257269
inputs: PublicBaseRollupInputs,
258270
signal?: AbortSignal,
259-
_epochNumber?: number,
271+
epochNumber?: number,
260272
): Promise<PublicInputsAndRecursiveProof<BaseOrMergeRollupPublicInputs, typeof RECURSIVE_PROOF_LENGTH>> {
261273
return this.enqueueAndWaitForJob(
262274
this.generateId(ProvingRequestType.PUBLIC_BASE_ROLLUP, inputs),
263275
ProvingRequestType.PUBLIC_BASE_ROLLUP,
264276
inputs,
277+
epochNumber,
265278
signal,
266279
);
267280
}
268281

269282
getRootParityProof(
270283
inputs: RootParityInputs,
271284
signal?: AbortSignal,
272-
_epochNumber?: number,
285+
epochNumber?: number,
273286
): Promise<PublicInputsAndRecursiveProof<ParityPublicInputs, typeof NESTED_RECURSIVE_PROOF_LENGTH>> {
274287
return this.enqueueAndWaitForJob(
275288
this.generateId(ProvingRequestType.ROOT_PARITY, inputs),
276289
ProvingRequestType.ROOT_PARITY,
277290
inputs,
291+
epochNumber,
278292
signal,
279293
);
280294
}
281295

282296
getRootRollupProof(
283297
input: RootRollupInputs,
284298
signal?: AbortSignal,
285-
_epochNumber?: number,
299+
epochNumber?: number,
286300
): Promise<PublicInputsAndRecursiveProof<RootRollupPublicInputs, typeof RECURSIVE_PROOF_LENGTH>> {
287301
return this.enqueueAndWaitForJob(
288302
this.generateId(ProvingRequestType.ROOT_ROLLUP, input),
289303
ProvingRequestType.ROOT_ROLLUP,
290304
input,
305+
epochNumber,
291306
signal,
292307
);
293308
}
294309

295310
getTubeProof(
296311
tubeInput: TubeInputs,
297312
signal?: AbortSignal,
298-
_epochNumber?: number,
313+
epochNumber?: number,
299314
): Promise<ProofAndVerificationKey<typeof TUBE_PROOF_LENGTH>> {
300315
return this.enqueueAndWaitForJob(
301316
this.generateId(ProvingRequestType.TUBE_PROOF, tubeInput),
302317
ProvingRequestType.TUBE_PROOF,
303318
tubeInput,
319+
epochNumber,
304320
signal,
305321
);
306322
}

yarn-project/prover-client/src/proving_broker/proving_agent.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ describe('ProvingAgent', () => {
239239
const inputs: ProvingJobInputs = { type: ProvingRequestType.BASE_PARITY, inputs: makeBaseParityInputs() };
240240
const job: ProvingJob = {
241241
id: randomBytes(8).toString('hex') as ProvingJobId,
242-
blockNumber: 1,
242+
epochNumber: 1,
243243
type: ProvingRequestType.BASE_PARITY,
244244
inputsUri: randomBytes(8).toString('hex') as ProofUri,
245245
};

0 commit comments

Comments
 (0)