Skip to content

Commit 8418412

Browse files
committed
Fix intermittent test failure
1 parent 7bd2a2e commit 8418412

11 files changed

+105
-30
lines changed

.github/workflows/npm-publish.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
name: npm-publish
22
on:
33
push:
4-
tags:
5-
- "v*"
4+
branches:
5+
- "master"
66
jobs:
77
test:
88
name: test

DEVELOP.md

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Development
2+
3+
## Publishing a new NPM Package
4+
5+
The NPM package publishing is handled by a GitHub Workflows using the [publish-to-npm](https://github.com/marketplace/actions/publish-to-npm) action.
6+
7+
To publish a new package, update the `package.json` package version and push a commit with the commit message "Release x.y.z" matching the package version.

package-lock.json

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
"build": "tsc --build src/tsconfig.json",
3131
"watch": "tsc --build src/tsconfig.json -w",
3232
"prepare": "tsc --build src/tsconfig.json",
33-
"test": "jest --runInBand --detectOpenHandles --testPathIgnorePatterns integration local-integration disconnection",
33+
"test": "jest --detectOpenHandles --testPathIgnorePatterns integration local-integration disconnection",
3434
"test:integration": "jest --runInBand --testPathIgnorePatterns disconnection --detectOpenHandles --verbose true",
3535
"test:local": "jest --runInBand --verbose true --detectOpenHandles local-integration",
3636
"test:disconnect": "jest --runInBand --verbose true --detectOpenHandles disconnection",
@@ -65,7 +65,7 @@
6565
"chalk": "^2.4.2",
6666
"console-stamp": "^0.2.7",
6767
"dayjs": "^1.8.15",
68-
"debug": "^4.1.1",
68+
"debug": "^4.2.0",
6969
"fast-xml-parser": "^3.12.12",
7070
"fp-ts": "^2.5.1",
7171
"got": "^9.6.0",

src/__tests__/disconnection/disconnect.spec.ts

+35-7
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE'
66

77
const ZEEBE_DOCKER_TAG = '0.24.2'
88

9-
jest.setTimeout(120000)
9+
jest.setTimeout(900000)
1010

1111
let container
1212
let worker
@@ -16,14 +16,19 @@ afterEach(async () => {
1616
await worker?.close()
1717
})
1818

19+
function log(msg) {
20+
// tslint:disable-next-line: no-console
21+
console.log(new Date().toString(), msg) // @DEBUG
22+
}
23+
1924
test('reconnects after a pod reschedule', async done => {
2025
let readyCount = 0
2126
let errorCount = 0
2227
const delay = timeout =>
2328
new Promise(res => setTimeout(() => res(), timeout))
2429

2530
// tslint:disable-next-line: no-console
26-
console.log('##### Starting container') // @DEBUG
31+
log('##### Starting container') // @DEBUG
2732

2833
container = await new GenericContainer(
2934
'camunda/zeebe',
@@ -38,10 +43,18 @@ test('reconnects after a pod reschedule', async done => {
3843
await delay(10000)
3944

4045
const zbc = new ZBClient(`localhost`)
46+
// tslint:disable-next-line: no-console
47+
log('##### Deploying workflow') // @DEBUG
48+
4149
await zbc.deployWorkflow('./src/__tests__/testdata/disconnection.bpmn')
4250
worker = zbc
4351
.createWorker({
52+
longPoll: 10000,
53+
pollInterval: 300,
4454
taskHandler: (_, complete) => {
55+
// tslint:disable-next-line: no-console
56+
log('##### Executing task handler') // @DEBUG
57+
4558
complete.success()
4659
},
4760
taskType: 'disconnection-task',
@@ -54,7 +67,7 @@ test('reconnects after a pod reschedule', async done => {
5467
})
5568

5669
// tslint:disable-next-line: no-console
57-
console.log('##### Deploying workflow') // @DEBUG
70+
log('##### Starting workflow') // @DEBUG
5871

5972
const wf = await zbc.createWorkflowInstanceWithResult({
6073
bpmnProcessId: 'disconnection',
@@ -64,15 +77,18 @@ test('reconnects after a pod reschedule', async done => {
6477
expect(wf.bpmnProcessId).toBeTruthy()
6578

6679
// tslint:disable-next-line: no-console
67-
console.log('##### Stopping container...') // @DEBUG
80+
log('##### Workflow finished') // @DEBUG
81+
82+
// tslint:disable-next-line: no-console
83+
log('##### Stopping container...') // @DEBUG
6884

6985
await container.stop()
7086

7187
// tslint:disable-next-line: no-console
72-
console.log('##### Container stopped.') // @DEBUG
88+
log('##### Container stopped.') // @DEBUG
7389

7490
// tslint:disable-next-line: no-console
75-
console.log('##### Starting container....') // @DEBUG
91+
log('##### Starting container....') // @DEBUG
7692

7793
container = await new GenericContainer(
7894
'camunda/zeebe',
@@ -81,15 +97,27 @@ test('reconnects after a pod reschedule', async done => {
8197
26500
8298
)
8399
.withExposedPorts(26500)
100+
.withEnv('ZEEBE_LOG_LEVEL', 'trace')
84101
.withWaitStrategy(Wait.forLogMessage('Bootstrap Broker-0 succeeded.'))
85102
.start()
86103

87104
// tslint:disable-next-line: no-console
88-
// console.log('##### Container started.') // @DEBUG
105+
log('##### Container started.') // @DEBUG
89106

90107
await delay(10000)
108+
109+
// tslint:disable-next-line: no-console
110+
log('##### Deploying workflow 2') // @DEBUG
91111
await zbc.deployWorkflow('./src/__tests__/testdata/disconnection.bpmn')
92112

113+
// tslint:disable-next-line: no-console
114+
// console.log('Workflow 2 deployed', _) // @DEBUG
115+
116+
await delay(15000)
117+
118+
// tslint:disable-next-line: no-console
119+
log('##### Starting workflow 2') // @DEBUG
120+
93121
const wf1 = await zbc.createWorkflowInstanceWithResult('disconnection', {})
94122
expect(wf1.bpmnProcessId).toBeTruthy()
95123
await worker.close()

src/__tests__/integration/Client-ConnectionError.spec.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ test(`Calls the onConnectionError handler if there is no broker and eagerConnect
1818
}, 7000)
1919
})
2020

21-
test(`Sets connected:undefined if there is no broker and no setting of eagerConnection`, async done => {
21+
test(`Sets connected:false if there is no broker and no setting of eagerConnection`, async done => {
2222
const zbc2 = new ZBClient('localtoast: 267890') // Broker doesn't exist!!!
2323
setTimeout(async () => {
24-
expect(zbc2.connected).toBe(undefined)
24+
expect(zbc2.connected).toBe(false)
2525
await zbc2.close()
2626
done()
2727
}, 5000)

src/__tests__/integration/Worker-LongPoll.spec.ts

-5
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,9 @@ process.env.ZEEBE_NODE_LOG_LEVEL = process.env.ZEEBE_NODE_LOG_LEVEL || 'NONE'
77
let zbcLongPoll
88

99
afterAll(async () => {
10-
const zbc = new ZBClient()
1110
await zbcLongPoll.close()
12-
await zbc.close()
1311
})
1412

15-
/**
16-
* This test is currently disabled
17-
*/
1813
test('Does long poll by default', async done => {
1914
jest.setTimeout(40000)
2015
zbcLongPoll = new ZBClient({

src/__tests__/integration/Worker-onReady.spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ test(`Does set connected: true if there is a broker and eagerConnection: true`,
4848
test(`Does not set connected: true if there is a broker and eagerConnection: false`, done => {
4949
const zbc2 = new ZBClient()
5050
setTimeout(async () => {
51-
expect(zbc2.connected).toBe(undefined)
51+
expect(zbc2.connected).toBe(false)
5252
await zbc2.close()
5353
done()
5454
}, 7000)

src/__tests__/local-integration/OnConnectionError.spec.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ test(`Calls ZBClient onConnectionError when there no broker, for the client and
6565
})
6666
setTimeout(() => {
6767
zbc2.close()
68-
expect(calledD).toBe(1)
68+
expect(calledD).toBe(2)
6969
done()
7070
}, 10000)
7171
})
@@ -82,7 +82,7 @@ test(`Debounces onConnectionError`, async done => {
8282
})
8383
setTimeout(() => {
8484
zbc2.close()
85-
expect(called).toBe(1) // toBeLessThanOrEqual(1)
85+
expect(called).toBe(2) // toBeLessThanOrEqual(1)
8686
done()
8787
}, 15000)
8888
})
@@ -108,6 +108,7 @@ test(`Does not call the onConnectionError handler if there is a business error`,
108108
calledF++
109109
},
110110
})
111+
111112
zbc2.createWorkflowInstance(wf, {}).catch(() => {
112113
wf = 'throw error away'
113114
})

src/lib/GrpcClient.ts

+35-4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
status as statusJS,
77
} from '@grpc/grpc-js'
88
import { loadSync, Options, PackageDefinition } from '@grpc/proto-loader'
9+
import * as _debug from 'debug'
910
import { EventEmitter } from 'events'
1011
import {
1112
Client,
@@ -20,6 +21,8 @@ import { BasicAuthConfig } from './interfaces'
2021
import { Loglevel } from './interfaces-published-contract'
2122
import { OAuthProvider } from './OAuthProvider'
2223

24+
const debug = _debug.default('grpc')
25+
2326
const useJS = process.env.ZEEBE_NODE_PUREJS?.toUpperCase() === 'TRUE'
2427
const credentials = useJS ? credentialsJS : credentialsC
2528
const InterceptingCall = useJS ? InterceptingCallJS : InterceptingCallC
@@ -222,12 +225,16 @@ export class GrpcClient extends EventEmitter {
222225
* The maximum time between subsequent connection attempts,
223226
* in ms
224227
*/
225-
'grpc.max_reconnect_backoff_ms': 30000,
228+
'grpc.max_reconnect_backoff_ms': 10000,
226229
/**
227230
* The minimum time between subsequent connection attempts,
228-
* in ms
231+
* in ms. Default is 1000ms, but this can cause an SSL Handshake failure.
232+
* This causes an intermittent failure in the Worker-LongPoll test when run
233+
* against Camunda Cloud.
234+
* Raised to 5000ms.
235+
* See: https://github.com/grpc/grpc/issues/8382#issuecomment-259482949
229236
*/
230-
'grpc.min_reconnect_backoff_ms': 1000,
237+
'grpc.min_reconnect_backoff_ms': 5000,
231238
/**
232239
* After a duration of this time the client/server
233240
* pings its peer to see if the transport is still alive.
@@ -240,7 +247,7 @@ export class GrpcClient extends EventEmitter {
240247
* not receive the ping ack, it will close the
241248
* transport. Int valued, milliseconds.
242249
*/
243-
'grpc.keepalive_timeout_ms': 60000,
250+
'grpc.keepalive_timeout_ms': 120000,
244251
'grpc.http2.min_time_between_pings_ms': 60000,
245252
/**
246253
* Minimum allowed time between a server receiving
@@ -267,6 +274,12 @@ export class GrpcClient extends EventEmitter {
267274
})
268275
this.listNameMethods = []
269276

277+
this.client.waitForReady(10000, error =>
278+
error
279+
? this.emit(MiddlewareSignals.Event.Error, error)
280+
: this.emit(MiddlewareSignals.Event.Ready)
281+
)
282+
270283
for (const key in listMethods) {
271284
if (listMethods[key]) {
272285
const methodName = listMethods[key].originalName as string
@@ -286,6 +299,7 @@ export class GrpcClient extends EventEmitter {
286299
)
287300
try {
288301
const metadata = await this.getAuthToken()
302+
289303
stream = this.client[methodName](
290304
timeNormalisedRequest,
291305
metadata
@@ -304,6 +318,19 @@ export class GrpcClient extends EventEmitter {
304318
),
305319
}
306320
}
321+
322+
// This deals with the case where during a broker restart the call returns a stream
323+
// but that stream is not a legit Gateway activation. In that case, the Gateway will
324+
// never time out or close the stream. So we have to manage that case.
325+
const clientsideTimeoutDuration =
326+
Duration.milliseconds.from(this.longPoll!) + 1000
327+
const clientSideTimeout = setTimeout(() => {
328+
debug(
329+
`Triggered client-side timeout after ${clientsideTimeoutDuration}ms`
330+
)
331+
stream.emit('end')
332+
}, clientsideTimeoutDuration)
333+
307334
/**
308335
* Once this gets attached here, it is attached to *all* calls
309336
* This is an issue if you do a sync call like cancelWorkflowSync
@@ -312,6 +339,8 @@ export class GrpcClient extends EventEmitter {
312339
* streaming calls, and each worker, which only does streaming calls
313340
*/
314341
stream.on('error', (error: GrpcStreamError) => {
342+
clearTimeout(clientSideTimeout)
343+
debug(`Error`, error)
315344
this.emit(MiddlewareSignals.Event.Error)
316345
if (error.message.includes('14 UNAVAILABLE')) {
317346
this.emit(
@@ -341,6 +370,8 @@ export class GrpcClient extends EventEmitter {
341370
`gRPC Status event: ${JSON.stringify(s)}`
342371
)
343372
)
373+
stream.on('end', () => clearTimeout(clientSideTimeout))
374+
344375
return stream
345376
}
346377

src/lib/ZBWorkerBase.ts

+14-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { ClientReadableStreamImpl } from '@grpc/grpc-js/build/src/call'
22
import { Chalk } from 'chalk'
3+
import * as _debug from 'debug'
34
import { EventEmitter } from 'events'
45
import { Duration, MaybeTimeDuration } from 'typed-duration'
56
import * as uuid from 'uuid'
@@ -11,6 +12,8 @@ import { ActivateJobsRequest, ActivateJobsResponse } from './interfaces-grpc'
1112
import { ZBClientOptions } from './interfaces-published-contract'
1213
import { TypedEmitter } from './TypedEmitter'
1314

15+
const debug = _debug.default('worker')
16+
1417
const MIN_ACTIVE_JOBS_RATIO_BEFORE_ACTIVATING_JOBS = 0.3
1518

1619
const CapacityEvent = {
@@ -99,6 +102,7 @@ export class ZBWorkerBase<
99102
private maxActiveJobsBeforeReactivation: number
100103
private pollInterval: MaybeTimeDuration
101104
private pollLoop: NodeJS.Timeout
105+
private pollMutex: boolean = false
102106

103107
constructor({
104108
grpcClient,
@@ -369,7 +373,7 @@ export class ZBWorkerBase<
369373
}
370374

371375
private async poll() {
372-
if (this.closePromise) {
376+
if (this.closePromise || this.pollMutex) {
373377
return
374378
}
375379
const isOverCapacity =
@@ -383,6 +387,8 @@ export class ZBWorkerBase<
383387
if (this.jobStream) {
384388
return
385389
}
390+
this.pollMutex = true
391+
debug('Polling...')
386392
this.logger.logDebug('Activating Jobs...')
387393
const id = uuid.v4()
388394
const jobStream = await this.activateJobs(id)
@@ -421,6 +427,7 @@ export class ZBWorkerBase<
421427
if (jobStream.error) {
422428
this.logger.logError({ id, error: jobStream.error.message })
423429
}
430+
this.pollMutex = false
424431
}
425432

426433
private async activateJobs(id: string) {
@@ -435,6 +442,7 @@ export class ZBWorkerBase<
435442
if (this.debugMode) {
436443
this.logger.logDebug(`Activating Jobs...`)
437444
}
445+
debug('Activating Jobs')
438446
let stream: any
439447

440448
const amount = this.maxJobsToActivate - this.activeJobs
@@ -453,6 +461,11 @@ export class ZBWorkerBase<
453461
requestTimeout
454462
)}, job timeout: ${Duration.value.of(this.timeout)}`
455463
)
464+
debug(
465+
`Requesting ${amount} jobs on [${id}] with requestTimeout ${Duration.value.of(
466+
requestTimeout
467+
)}, job timeout: ${Duration.value.of(this.timeout)}`
468+
)
456469

457470
try {
458471
stream = await this.grpcClient.activateJobsStream(

0 commit comments

Comments
 (0)