Skip to content

Commit 97362d2

Browse files
committed
feat(camunda): implement backoff on errors for REST job worker
fixes #370
1 parent 22ef706 commit 97362d2

File tree

9 files changed

+296
-24
lines changed

9 files changed

+296
-24
lines changed

README.md

+9
Original file line numberDiff line numberDiff line change
@@ -341,3 +341,12 @@ client.streamJobs({
341341
timeout: 2000,
342342
})
343343
```
344+
345+
## Polling worker backoff in error conditions
346+
347+
When a polling worker encounters an error, including not being authenticated, the worker will back off subsequent polls by +2 seconds with each subsequent failure, up to a maximum of `CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS`, which is 15000 by default (15 seconds). If the failure is due to invalid credentials and occurs during the token request, then the worker backoff will be compounded with a token endpoint backoff, which is +1000ms for each subsequent failure up to a maximum of 15s.
348+
349+
This means that if you start a worker with invalid credentials, then the polling backoff will look like this, by default (times in seconds): 3, 6, 9, 12, 15, 18, 21, 23, 24, 25, 26, 27, 28, 29, 30, 30, 30...
350+
351+
If the worker is backing off for a reason other than invalid credentials - for example a backpressure signal from the gateway - it will be: 2, 4, 6, 8, 10, 12, 14, 15, 15, 15.....
352+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import { format } from 'winston'
2+
import Transport from 'winston-transport'
3+
4+
import { createLogger } from '../../../c8/lib/C8Logger'
5+
import { CamundaRestClient } from '../../../c8/lib/CamundaRestClient'
6+
7+
jest.setTimeout(30000)
8+
9+
// Custom log transport to suppress errors in the console, and allow them to be examined
10+
class MemoryTransport extends Transport {
11+
logs: { message: string; level: string; timestamp: string }[]
12+
errors: Error[]
13+
constructor(opts?: Transport.TransportStreamOptions | undefined) {
14+
super(opts)
15+
this.logs = []
16+
this.errors = []
17+
}
18+
log(
19+
info: Error | { message: string; level: string; timestamp: string },
20+
callback: () => void
21+
) {
22+
// Immediately emit the logged event (this is required for Winston)
23+
setImmediate(() => {
24+
this.emit('logged', info)
25+
})
26+
if (info instanceof Error) {
27+
this.errors.push(info)
28+
} else {
29+
this.logs.push(info)
30+
}
31+
callback()
32+
}
33+
}
34+
35+
test('REST worker will backoff on UNAUTHENTICATED', (done) => {
36+
let durations = 0
37+
let pollCountBackingOffWorker = 0
38+
const backoffs: number[] = []
39+
40+
const transportBackingOffWorker = new MemoryTransport()
41+
const logBackingOffWorker = createLogger({
42+
transports: [transportBackingOffWorker],
43+
})
44+
45+
const restClientBackingoffWorker = new CamundaRestClient({
46+
config: { CAMUNDA_AUTH_STRATEGY: 'NONE', logger: logBackingOffWorker },
47+
})
48+
const backingOffWorker = restClientBackingoffWorker.createJobWorker({
49+
type: 'unauthenticated-worker',
50+
jobHandler: async () => {
51+
throw new Error('Not Implemented') // is never called
52+
},
53+
worker: 'unauthenticated-test-worker',
54+
maxJobsToActivate: 10,
55+
timeout: 30000,
56+
})
57+
58+
backingOffWorker.on('backoff', (duration) => {
59+
durations += duration
60+
backoffs.push(duration)
61+
})
62+
backingOffWorker.on('poll', () => {
63+
pollCountBackingOffWorker++
64+
})
65+
setTimeout(() => {
66+
backingOffWorker.stop()
67+
expect(durations).toBe(20000)
68+
expect(pollCountBackingOffWorker).toBe(3)
69+
// Assert that each backoff is greater than the previous one; ie: the backoff is increasing
70+
for (let i = 1; i < backoffs.length; i++) {
71+
expect(backoffs[i]).toBeGreaterThan(backoffs[i - 1])
72+
}
73+
done()
74+
}, 25000)
75+
})
76+
77+
test('REST worker uses a supplied custom max backoff', (done) => {
78+
let pollCount = 0
79+
const backoffs: number[] = []
80+
const MAX_BACKOFF = 2000
81+
const transport = new MemoryTransport()
82+
const logger = createLogger({
83+
transports: [transport],
84+
})
85+
const restClient = new CamundaRestClient({
86+
config: {
87+
CAMUNDA_AUTH_STRATEGY: 'NONE',
88+
CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: MAX_BACKOFF,
89+
logger,
90+
},
91+
})
92+
93+
const w = restClient.createJobWorker({
94+
type: 'unauthenticated-worker',
95+
jobHandler: async () => {
96+
throw new Error('Not Implemented') // is never called
97+
},
98+
worker: 'unauthenticated-test-worker',
99+
maxJobsToActivate: 10,
100+
timeout: 30000,
101+
})
102+
w.on('backoff', (duration) => {
103+
expect(duration).toBeLessThanOrEqual(MAX_BACKOFF)
104+
backoffs.push(duration)
105+
})
106+
w.on('poll', () => {
107+
pollCount++
108+
})
109+
setTimeout(() => {
110+
w.stop()
111+
expect(pollCount).toBe(3)
112+
expect(backoffs.length).toBe(3)
113+
done()
114+
}, 10000)
115+
})
116+
117+
/**
118+
* This test is deliberately commented out. The behaviour was manually verified on 5 Feb, 2025.
119+
*
120+
* Testing the outer bound of the token endpoint backoff when it is hardcoded to 15s takes a long time.
121+
* Making the max token endpoint backoff configurable would make this easier to test.
122+
*
123+
*/
124+
xtest('REST worker uses a supplied custom max backoff with invalid secret', (done) => {
125+
let durations = 0
126+
let pollCount = 0
127+
const backoffs: number[] = []
128+
129+
/**
130+
* Suppress all logging output with this custom logger. The token endpoint will emit error logs during this test.
131+
*/
132+
const memoryTransport = new MemoryTransport()
133+
134+
const logger = createLogger({
135+
format: format.combine(format.timestamp(), format.json()),
136+
transports: [memoryTransport],
137+
})
138+
139+
const MAX_BACKOFF = 2000
140+
const restClient = new CamundaRestClient({
141+
config: {
142+
CAMUNDA_TOKEN_DISK_CACHE_DISABLE: true,
143+
ZEEBE_CLIENT_ID: 'Does-not-exist',
144+
ZEEBE_CLIENT_SECRET: 'NONE',
145+
CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: MAX_BACKOFF,
146+
logger,
147+
},
148+
})
149+
150+
const w = restClient.createJobWorker({
151+
type: 'unauthenticated-worker',
152+
jobHandler: async () => {
153+
throw new Error('Not Implemented') // is never called
154+
},
155+
worker: 'unauthenticated-test-worker',
156+
maxJobsToActivate: 10,
157+
timeout: 30000,
158+
autoStart: false, // Do not autostart, so we can attach event listeners before it starts polling
159+
})
160+
w.on('backoff', (duration) => {
161+
durations += duration
162+
backoffs.push(duration)
163+
})
164+
w.on('poll', () => {
165+
pollCount++
166+
})
167+
w.start() // Start the worker now that the event listeners are attached
168+
setTimeout(() => {
169+
w.stop()
170+
const logs = memoryTransport.logs
171+
// Convert timestamp strings to milliseconds since epoch.
172+
const times = logs
173+
.filter((log) =>
174+
log.message.includes('Backing off worker poll due to failure.')
175+
)
176+
.map((log) => new Date(log.timestamp).getTime())
177+
console.log('times.length', times.length)
178+
// Calculate delays between consecutive errors.
179+
const delays: number[] = []
180+
for (let i = 1; i < times.length; i++) {
181+
delays.push(times[i] - times[i - 1])
182+
}
183+
// Assert that each delay is less than or equal to the max backoff delay
184+
for (let i = 1; i < delays.length; i++) {
185+
// expect(delays[i] - delays[i - 1]).toBeLessThanOrEqual(MAX_BACKOFF)
186+
console.log(delays[i])
187+
}
188+
expect(pollCount).toBe(4)
189+
expect(durations).toBe(8000)
190+
done()
191+
}, 20000)
192+
})

src/__tests__/zeebe/integration/Worker-Backoff.spec.ts

+2-8
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,19 @@
11
import { restoreZeebeLogging, suppressZeebeLogging } from '../../../lib'
22
import { ZeebeGrpcClient } from '../../../zeebe'
33

4-
/**
5-
* This is a manually verified test. To check it, comment out the next line, then check the console output.
6-
* You should see the error messages from the worker, and the backoff expanding the time between them.
7-
*/
84
suppressZeebeLogging()
95

106
jest.setTimeout(30000)
117
afterAll(() => {
128
restoreZeebeLogging()
139
})
1410

15-
test('Will backoff on UNAUTHENTICATED', (done) => {
11+
test('gRPC worker will backoff on UNAUTHENTICATED', (done) => {
1612
let durations = 0
1713

1814
const zbc = new ZeebeGrpcClient({
1915
config: {
2016
CAMUNDA_AUTH_STRATEGY: 'NONE',
21-
CAMUNDA_LOG_LEVEL: 'DEBUG',
2217
},
2318
})
2419

@@ -38,13 +33,12 @@ test('Will backoff on UNAUTHENTICATED', (done) => {
3833
}, 25000)
3934
})
4035

41-
test('Will use a supplied custom max backoff', (done) => {
36+
test('gRPC worker uses a supplied custom max backoff', (done) => {
4237
let durations = 0
4338

4439
const zbc = new ZeebeGrpcClient({
4540
config: {
4641
CAMUNDA_AUTH_STRATEGY: 'NONE',
47-
CAMUNDA_LOG_LEVEL: 'DEBUG',
4842
CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS: 2000,
4943
},
5044
})

src/c8/lib/C8Logger.ts

+8
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,11 @@ export function getLogger(config?: Camunda8ClientConfiguration) {
5050
}
5151
return config?.logger ?? defaultLogger
5252
}
53+
54+
export function createLogger(options?: winston.LoggerOptions) {
55+
const logger: winston.Logger & {
56+
trace: (message: string | undefined, ...meta: any[]) => void // eslint-disable-line @typescript-eslint/no-explicit-any
57+
} = winston.createLogger(options) as any // eslint-disable-line @typescript-eslint/no-explicit-any
58+
logger.trace = logger.silly
59+
return logger
60+
}

src/c8/lib/CamundaJobWorker.ts

+46-5
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,15 @@ type CamundaJobWorkerEvents = {
2323
maxJobsToActivate,
2424
worker,
2525
}: {
26+
/** How many jobs are currently activated */
2627
currentlyActiveJobCount: number
28+
/** The max number of jobs we are requesting to activate */
2729
maxJobsToActivate: number
30+
/** The worker name */
2831
worker: string
2932
}) => void
33+
backoff: (backoffDuration: number) => void
34+
work: (jobs: RestJob<unknown, unknown>[]) => void
3035
}
3136

3237
export interface CamundaJobWorkerConfig<
@@ -35,7 +40,7 @@ export interface CamundaJobWorkerConfig<
3540
> extends ActivateJobsRequest {
3641
inputVariableDto?: Ctor<VariablesDto>
3742
customHeadersDto?: Ctor<CustomHeadersDto>
38-
/** How often the worker will poll for new jobs. Defaults to 30s */
43+
/** How often the worker will poll for new jobs. Defaults to 300ms */
3944
pollIntervalMs?: number
4045
jobHandler: (
4146
job: RestJob<VariablesDto, CustomHeadersDto> &
@@ -45,8 +50,9 @@ export interface CamundaJobWorkerConfig<
4550
logger?: Logger
4651
/** Default: true. Start the worker polling immediately. If set to `false`, call the worker's `start()` method to start polling for work. */
4752
autoStart?: boolean
53+
maxBackoffTimeMs?: number
4854
}
49-
// Make this class extend event emitter and have a typed event 'pollError'
55+
5056
export class CamundaJobWorker<
5157
VariablesDto extends LosslessDto,
5258
CustomHeadersDto extends LosslessDto,
@@ -55,6 +61,10 @@ export class CamundaJobWorker<
5561
public capacity: number
5662
private loopHandle?: NodeJS.Timeout
5763
private pollInterval: number
64+
private pollLock: boolean = false
65+
private backoffTimer: NodeJS.Timeout | undefined
66+
private backoffRetryCount: number = 0
67+
private maxBackoffTimeMs: number
5868
public log: Logger
5969
logMeta: () => {
6070
worker: string
@@ -72,9 +82,12 @@ export class CamundaJobWorker<
7282
private readonly restClient: CamundaRestClient
7383
) {
7484
super()
75-
this.pollInterval = config.pollIntervalMs ?? 30000
85+
this.pollInterval = config.pollIntervalMs ?? 300
7686
this.capacity = this.config.maxJobsToActivate
77-
this.log = getLogger({ logger: config.logger })
87+
this.maxBackoffTimeMs =
88+
config.maxBackoffTimeMs ??
89+
restClient.getConfig().CAMUNDA_JOB_WORKER_MAX_BACKOFF_MS
90+
this.log = getLogger({ logger: config.logger ?? restClient.log })
7891
this.logMeta = () => ({
7992
worker: this.config.worker,
8093
type: this.config.type,
@@ -102,6 +115,8 @@ export class CamundaJobWorker<
102115
this.log.debug(`Stop requested`, this.logMeta())
103116
/** Stopping polling for new jobs */
104117
clearInterval(this.loopHandle)
118+
/** Do not allow the backoff retry to restart polling */
119+
clearTimeout(this.backoffTimer)
105120
return new Promise((resolve, reject) => {
106121
if (this.currentlyActiveJobCount === 0) {
107122
this.log.debug(`All jobs drained. Worker stopped.`, this.logMeta())
@@ -135,11 +150,15 @@ export class CamundaJobWorker<
135150
}
136151

137152
private poll() {
153+
if (this.pollLock) {
154+
return
155+
}
138156
this.emit('poll', {
139157
currentlyActiveJobCount: this.currentlyActiveJobCount,
140158
maxJobsToActivate: this.config.maxJobsToActivate,
141159
worker: this.config.worker,
142160
})
161+
this.pollLock = true
143162
if (this.currentlyActiveJobCount >= this.config.maxJobsToActivate) {
144163
this.log.debug(`At capacity - not requesting more jobs`, this.logMeta())
145164
return
@@ -158,10 +177,32 @@ export class CamundaJobWorker<
158177
const count = jobs.length
159178
this.currentlyActiveJobCount += count
160179
this.log.debug(`Activated ${count} jobs`, this.logMeta())
180+
this.emit('work', jobs)
161181
// The job handlers for the activated jobs will run in parallel
162182
jobs.forEach((job) => this.handleJob(job))
183+
this.pollLock = false
184+
this.backoffRetryCount = 0
185+
})
186+
.catch((e) => {
187+
// This can throw a 400 or 500 REST Error with the Content-Type application/problem+json
188+
// The schema is:
189+
// { type: string, title: string, status: number, detail: string, instance: string }
190+
this.log.error('Error during job worker poll')
191+
this.log.error(e)
192+
this.emit('pollError', e)
193+
const backoffDuration = Math.min(
194+
2000 * ++this.backoffRetryCount,
195+
this.maxBackoffTimeMs
196+
)
197+
this.log.warn(
198+
`Backing off worker poll due to failure. Next attempt will be made in ${backoffDuration}ms...`
199+
)
200+
this.emit('backoff', backoffDuration)
201+
this.backoffTimer = setTimeout(() => {
202+
this.pollLock = false
203+
this.poll()
204+
}, backoffDuration)
163205
})
164-
.catch((e) => this.emit('pollError', e))
165206
}
166207

167208
private async handleJob(

0 commit comments

Comments
 (0)