-
Notifications
You must be signed in to change notification settings - Fork 31.1k
/
Copy pathround_robin_handle.js
125 lines (100 loc) Β· 2.91 KB
/
round_robin_handle.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
'use strict';
const {
ArrayIsArray,
Boolean,
Map,
} = primordials;
const assert = require('internal/assert');
const net = require('net');
const { sendHelper } = require('internal/cluster/utils');
const { constants } = internalBinding('tcp_wrap');
module.exports = RoundRobinHandle;
function RoundRobinHandle(key, address, { port, fd, flags }) {
this.key = key;
this.all = new Map();
this.free = new Map();
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) {
this.server.listen({
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address); // UNIX socket path.
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}
RoundRobinHandle.prototype.add = function(worker, send) {
assert(this.all.has(worker.id) === false);
this.all.set(worker.id, worker);
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
if (this.server === null)
return done();
// Still busy binding.
this.server.once('listening', done);
this.server.once('error', (err) => {
send(err.errno, null);
});
};
RoundRobinHandle.prototype.remove = function(worker) {
const existed = this.all.delete(worker.id);
if (!existed)
return false;
this.free.delete(worker.id);
if (this.all.size !== 0)
return false;
for (const handle of this.handles) {
handle.close();
}
this.handles = [];
this.handle.close();
this.handle = null;
return true;
};
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
const [ workerEntry ] = this.free;
if (ArrayIsArray(workerEntry)) {
const [ workerId, worker ] = workerEntry;
this.free.delete(workerId);
this.handoff(worker);
}
};
RoundRobinHandle.prototype.handoff = function(worker) {
if (!this.all.has(worker.id)) {
return; // Worker is closing (or has closed) the server.
}
const handle = this.handles.shift();
if (handle === undefined) {
this.free.set(worker.id, worker); // Add to ready queue again.
return;
}
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
};