Skip to content

Commit bf701d6

Browse files
committed
fix(topology): ensure selection wait queue is always processed
There were two bugs here. First, processing the wait queue used a dynamic upper bound while looping through members, resulting in not all members actually being processed and subsequently timing out. Second, if some sort of error occurred during applying the selector to an array of server descriptions, processing would prematurely stop, potentially leaving wait queue members stuck. NODE-2467
1 parent 361bc1e commit bf701d6

File tree

2 files changed

+64
-6
lines changed

2 files changed

+64
-6
lines changed

lib/core/sdam/topology.js

+6-6
Original file line numberDiff line numberDiff line change
@@ -434,8 +434,7 @@ class Topology extends EventEmitter {
434434
}, serverSelectionTimeoutMS);
435435
}
436436

437-
// place the member at the front of the wait queue
438-
this[kWaitQueue].unshift(waitQueueMember);
437+
this[kWaitQueue].push(waitQueueMember);
439438
processWaitQueue(this);
440439
}
441440

@@ -980,7 +979,7 @@ function srvPollingHandler(topology) {
980979

981980
function drainWaitQueue(queue, err) {
982981
while (queue.length) {
983-
const waitQueueMember = queue.pop();
982+
const waitQueueMember = queue.shift();
984983
clearTimeout(waitQueueMember.timer);
985984
if (!waitQueueMember[kCancelled]) {
986985
waitQueueMember.callback(err);
@@ -996,7 +995,8 @@ function processWaitQueue(topology) {
996995

997996
const isSharded = topology.description.type === TopologyType.Sharded;
998997
const serverDescriptions = Array.from(topology.description.servers.values());
999-
for (let i = 0; i < topology[kWaitQueue].length; ++i) {
998+
const membersToProcess = topology[kWaitQueue].length;
999+
for (let i = 0; i < membersToProcess; ++i) {
10001000
const waitQueueMember = topology[kWaitQueue].shift();
10011001
if (waitQueueMember[kCancelled]) {
10021002
continue;
@@ -1011,12 +1011,12 @@ function processWaitQueue(topology) {
10111011
} catch (e) {
10121012
clearTimeout(waitQueueMember.timer);
10131013
waitQueueMember.callback(e);
1014-
break;
1014+
continue;
10151015
}
10161016

10171017
if (selectedDescriptions.length === 0) {
10181018
topology[kWaitQueue].push(waitQueueMember);
1019-
break;
1019+
continue;
10201020
}
10211021

10221022
const selectedServerDescription = randomSelection(selectedDescriptions);

test/unit/sdam/server_selection/select_servers.test.js

+58
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,62 @@ describe('selectServer', function() {
6464
});
6565
});
6666
});
67+
68+
describe('waitQueue', function() {
69+
it('should process all wait queue members, including selection with errors', function(done) {
70+
const topology = new Topology('someserver:27019');
71+
const selectServer = this.sinon
72+
.stub(Topology.prototype, 'selectServer')
73+
.callsFake(function(selector, options, callback) {
74+
const server = Array.from(this.s.servers.values())[0];
75+
selectServer.restore();
76+
callback(null, server);
77+
});
78+
79+
this.sinon.stub(Server.prototype, 'connect').callsFake(function() {
80+
this.s.state = 'connected';
81+
this.emit('connect');
82+
});
83+
84+
const toSelect = 10;
85+
let completed = 0;
86+
function finish() {
87+
completed++;
88+
console.log(completed);
89+
if (completed === toSelect) done();
90+
}
91+
92+
// methodology:
93+
// - perform 9 server selections, a few with a selector that throws an error
94+
// - ensure each selection immediately returns an empty result (gated by a boolean)
95+
// guaranteeing tha the queue will be full before the last selection
96+
// - make one last selection, but ensure that all selections are no longer blocked from
97+
// returning their value
98+
// - verify that 10 callbacks were called
99+
100+
topology.connect(err => {
101+
expect(err).to.not.exist;
102+
103+
let preventSelection = true;
104+
const anySelector = td => {
105+
if (preventSelection) return [];
106+
const server = Array.from(td.servers.values())[0];
107+
return [server];
108+
};
109+
110+
const failingSelector = () => {
111+
if (preventSelection) return [];
112+
throw new TypeError('bad news!');
113+
};
114+
115+
preventSelection = true;
116+
for (let i = 0; i < toSelect - 1; ++i) {
117+
topology.selectServer(i % 5 === 0 ? failingSelector : anySelector, finish);
118+
}
119+
120+
preventSelection = false;
121+
topology.selectServer(anySelector, finish);
122+
});
123+
});
124+
});
67125
});

0 commit comments

Comments
 (0)