Skip to content

Commit f69f51c

Browse files
committed
fix: multiple concurrent attempts to process the queue may fail
In certain failover events, multiple entities may signal that the server selection queue should be processed. This is problematic because the queue is processed serially per invocation, even though the queue may grow during processing. Likely a more robust solution is required in the future, but for now we can prevent issues by ensuring the loop breaks if no more wait queue members are present. NODE-2472
1 parent cde11ec commit f69f51c

File tree

3 files changed

+70
-5
lines changed

3 files changed

+70
-5
lines changed

lib/core/sdam/topology.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,8 @@ class Topology extends EventEmitter {
378378
let readPreference;
379379
if (selector instanceof ReadPreference) {
380380
readPreference = selector;
381+
} else if (typeof selector === 'string') {
382+
readPreference = new ReadPreference(selector);
381383
} else {
382384
translateReadPreference(options);
383385
readPreference = options.readPreference || ReadPreference.primary;
@@ -997,10 +999,9 @@ function processWaitQueue(topology) {
997999
return;
9981000
}
9991001

1000-
const isSharded = topology.description.type === TopologyType.Sharded;
10011002
const serverDescriptions = Array.from(topology.description.servers.values());
10021003
const membersToProcess = topology[kWaitQueue].length;
1003-
for (let i = 0; i < membersToProcess; ++i) {
1004+
for (let i = 0; i < membersToProcess && topology[kWaitQueue].length; ++i) {
10041005
const waitQueueMember = topology[kWaitQueue].shift();
10051006
if (waitQueueMember[kCancelled]) {
10061007
continue;
@@ -1026,6 +1027,7 @@ function processWaitQueue(topology) {
10261027
const selectedServerDescription = randomSelection(selectedDescriptions);
10271028
const selectedServer = topology.s.servers.get(selectedServerDescription.address);
10281029
const transaction = waitQueueMember.transaction;
1030+
const isSharded = topology.description.type === TopologyType.Sharded;
10291031
if (isSharded && transaction && transaction.isActive) {
10301032
transaction.pinServer(selectedServer);
10311033
}

test/unit/cmap/connection.test.js

+1-3
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ const expect = require('chai').expect;
99
describe('Connection', function() {
1010
let server;
1111
after(() => mock.cleanup());
12-
before(() =>
13-
mock.createServer().then(s => (server = s))
14-
);
12+
before(() => mock.createServer().then(s => (server = s)));
1513

1614
it('should support fire-and-forget messages', function(done) {
1715
server.setMessageHandler(request => {

test/unit/sdam/topology.test.js

+65
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,71 @@ const expect = require('chai').expect;
77
const sinon = require('sinon');
88

99
describe('Topology (unit)', function() {
10+
it('should successfully process multiple queue processing requests', function(done) {
11+
const singleNodeIsMaster = Object.assign({}, mock.DEFAULT_ISMASTER, {
12+
maxWireVersion: 9,
13+
ismaster: true,
14+
secondary: false,
15+
setName: 'rs',
16+
me: 'a:27017',
17+
hosts: ['a:27017'],
18+
logicalSessionTimeoutMinutes: 10
19+
});
20+
21+
const topology = new Topology('a:27017', { replicaSet: 'rs' });
22+
this.sinon.stub(Server.prototype, 'connect').callsFake(function() {
23+
this.s.state = 'connected';
24+
this.emit('connect');
25+
setTimeout(
26+
() =>
27+
this.emit('descriptionReceived', new ServerDescription(this.name, singleNodeIsMaster)),
28+
100
29+
);
30+
});
31+
32+
function simulatedRetryableReadOperation(topology, callback) {
33+
topology.selectServer('primary', err => {
34+
expect(err).to.not.exist;
35+
36+
topology.selectServer('primary', err => {
37+
expect(err).to.not.exist;
38+
39+
callback();
40+
});
41+
});
42+
}
43+
44+
topology.connect(err => {
45+
expect(err).to.not.exist;
46+
this.defer(() => topology.close());
47+
48+
let selected = 0;
49+
const completionHandler = err => {
50+
expect(err).to.not.exist;
51+
52+
selected++;
53+
if (selected === 3) done();
54+
};
55+
56+
// explicitly prevent server selection by reverting to `Unknown`
57+
const server = topology.s.servers.get('a:27017');
58+
server.emit('descriptionReceived', new ServerDescription(server.name, null));
59+
process.nextTick(() => {
60+
simulatedRetryableReadOperation(topology, completionHandler);
61+
simulatedRetryableReadOperation(topology, completionHandler);
62+
63+
setTimeout(() => {
64+
server.emit(
65+
'descriptionReceived',
66+
new ServerDescription(server.name, singleNodeIsMaster)
67+
);
68+
69+
simulatedRetryableReadOperation(topology, completionHandler);
70+
}, 250);
71+
});
72+
});
73+
});
74+
1075
describe('shouldCheckForSessionSupport', function() {
1176
beforeEach(function() {
1277
this.sinon = sinon.sandbox.create();

0 commit comments

Comments
 (0)