Skip to content

Commit 4e224e5

Browse files
authored
fix: wait in queue to be ready in getNextJob fixes OptimalBits#1852
1 parent 29e4115 commit 4e224e5

File tree

2 files changed

+22
-19
lines changed

2 files changed

+22
-19
lines changed

lib/process/sandbox.js

-3
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ module.exports = function(processFile, childPool) {
3636
case 'log':
3737
job.log(msg.value);
3838
break;
39-
case 'update':
40-
job.update(msg.value);
41-
break;
4239
}
4340
};
4441

lib/queue.js

+22-16
Original file line numberDiff line numberDiff line change
@@ -1129,7 +1129,7 @@ Queue.prototype.multi = function() {
11291129
/**
11301130
Returns a promise that resolves to the next job in queue.
11311131
*/
1132-
Queue.prototype.getNextJob = function() {
1132+
Queue.prototype.getNextJob = async function() {
11331133
if (this.closing) {
11341134
return Promise.resolve();
11351135
}
@@ -1138,27 +1138,33 @@ Queue.prototype.getNextJob = function() {
11381138
//
11391139
// Waiting for new jobs to arrive
11401140
//
1141-
return this.bclient
1142-
.brpoplpush(this.keys.wait, this.keys.active, this.settings.drainDelay)
1143-
.then(
1144-
jobId => {
1145-
if (jobId) {
1146-
return this.moveToActive(jobId);
1147-
}
1148-
},
1149-
err => {
1150-
// Swallow error if locally paused since we did force a disconnection
1151-
if (!(this.paused && err.message === 'Connection is closed.')) {
1152-
throw err;
1153-
}
1154-
}
1141+
try {
1142+
const jobId = await this.bclient.brpoplpush(
1143+
this.keys.wait,
1144+
this.keys.active,
1145+
this.settings.drainDelay
11551146
);
1147+
1148+
if (jobId) {
1149+
return this.moveToActive(jobId);
1150+
}
1151+
} catch (err) {
1152+
err => {
1153+
// Swallow error if locally paused since we did force a disconnection
1154+
if (!(this.paused && err.message === 'Connection is closed.')) {
1155+
throw err;
1156+
}
1157+
};
1158+
}
11561159
} else {
11571160
return this.moveToActive();
11581161
}
11591162
};
11601163

1161-
Queue.prototype.moveToActive = function(jobId) {
1164+
Queue.prototype.moveToActive = async function(jobId) {
1165+
// For manual retrieving jobs we need to wait for the queue to be ready.
1166+
await this.isReady();
1167+
11621168
return scripts.moveToActive(this, jobId).then(([jobData, jobId]) => {
11631169
return this.nextJobFromJobData(jobData, jobId);
11641170
});

0 commit comments

Comments
 (0)