Skip to content

Commit 607aa3a

Browse files
bnoordhuisrvagg
authored andcommitted
child_process: add callback parameter to .send()
Add an optional callback parameter to `ChildProcess.prototype.send()` that is invoked when the message has been sent. Juggle the control channel's reference count so that in-flight messages keep the event loop (and therefore the process) alive until they have been sent. `ChildProcess.prototype.send()` and `process.send()` used to operate synchronously but became asynchronous in commit libuv/libuv@393c1c5 ("unix: set non-block mode in uv_{pipe,tcp,udp}_open"), which landed in io.js in commit 07bd05b ("deps: update libuv to 1.2.1"). Fixes: #760 PR-URL: #2620 Reviewed-By: trevnorris - Trevor Norris <trev.norris@gmail.com> Reviewed-By: jasnell - James M Snell <jasnell@gmail.com>
1 parent 599d4f5 commit 607aa3a

File tree

5 files changed

+111
-36
lines changed

5 files changed

+111
-36
lines changed

doc/api/child_process.markdown

+14-9
Original file line numberDiff line numberDiff line change
@@ -214,13 +214,15 @@ to a process.
214214

215215
See `kill(2)`
216216

217-
### child.send(message[, sendHandle])
217+
### child.send(message[, sendHandle][, callback])
218218

219219
* `message` {Object}
220220
* `sendHandle` {Handle object}
221+
* `callback` {Function}
222+
* Return: Boolean
221223

222224
When using `child_process.fork()` you can write to the child using
223-
`child.send(message, [sendHandle])` and messages are received by
225+
`child.send(message[, sendHandle][, callback])` and messages are received by
224226
a `'message'` event on the child.
225227

226228
For example:
@@ -246,11 +248,6 @@ And then the child script, `'sub.js'` might look like this:
246248
In the child the `process` object will have a `send()` method, and `process`
247249
will emit objects each time it receives a message on its channel.
248250

249-
Please note that the `send()` method on both the parent and child are
250-
synchronous - sending large chunks of data is not advised (pipes can be used
251-
instead, see
252-
[`child_process.spawn`](#child_process_child_process_spawn_command_args_options)).
253-
254251
There is a special case when sending a `{cmd: 'NODE_foo'}` message. All messages
255252
containing a `NODE_` prefix in its `cmd` property will not be emitted in
256253
the `message` event, since they are internal messages used by Node.js core.
@@ -261,8 +258,16 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
261258
socket object to another process. The child will receive the object as its
262259
second argument to the `message` event.
263260

264-
Emits an `'error'` event if the message cannot be sent, for example because
265-
the child process has already exited.
261+
The `callback` option is a function that is invoked after the message is
262+
sent but before the target may have received it. It is called with a single
263+
argument: `null` on success, or an `Error` object on failure.
264+
265+
`child.send()` emits an `'error'` event if no callback was given and the message
266+
cannot be sent, for example because the child process has already exited.
267+
268+
Returns `true` under normal circumstances or `false` when the backlog of
269+
unsent messages exceeds a threshold that makes it unwise to send more.
270+
Use the callback mechanism to implement flow control.
266271

267272
#### Example: sending server object
268273

doc/api/cluster.markdown

+3-1
Original file line numberDiff line numberDiff line change
@@ -426,10 +426,12 @@ exit, the master may choose not to respawn a worker based on this value.
426426
// kill worker
427427
worker.kill();
428428

429-
### worker.send(message[, sendHandle])
429+
### worker.send(message[, sendHandle][, callback])
430430

431431
* `message` {Object}
432432
* `sendHandle` {Handle object}
433+
* `callback` {Function}
434+
* Return: Boolean
433435

434436
Send a message to a worker or master, optionally with a handle.
435437

lib/child_process.js

+3-7
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,12 @@ exports._forkChild = function(fd) {
4848
var p = new Pipe(true);
4949
p.open(fd);
5050
p.unref();
51-
setupChannel(process, p);
52-
53-
var refs = 0;
51+
const control = setupChannel(process, p);
5452
process.on('newListener', function(name) {
55-
if (name !== 'message' && name !== 'disconnect') return;
56-
if (++refs === 1) p.ref();
53+
if (name === 'message' || name === 'disconnect') control.ref();
5754
});
5855
process.on('removeListener', function(name) {
59-
if (name !== 'message' && name !== 'disconnect') return;
60-
if (--refs === 0) p.unref();
56+
if (name === 'message' || name === 'disconnect') control.unref();
6157
});
6258
};
6359

lib/internal/child_process.js

+72-19
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,25 @@ function setupChannel(target, channel) {
397397
target._channel = channel;
398398
target._handleQueue = null;
399399

400+
const control = new class extends EventEmitter {
401+
constructor() {
402+
super();
403+
this.channel = channel;
404+
this.refs = 0;
405+
}
406+
ref() {
407+
if (++this.refs === 1) {
408+
this.channel.ref();
409+
}
410+
}
411+
unref() {
412+
if (--this.refs === 0) {
413+
this.channel.unref();
414+
this.emit('unref');
415+
}
416+
}
417+
};
418+
400419
var decoder = new StringDecoder('utf8');
401420
var jsonBuffer = '';
402421
channel.buffering = false;
@@ -446,7 +465,7 @@ function setupChannel(target, channel) {
446465
target._handleQueue = null;
447466

448467
queue.forEach(function(args) {
449-
target._send(args.message, args.handle, false);
468+
target._send(args.message, args.handle, false, args.callback);
450469
});
451470

452471
// Process a pending disconnect (if any).
@@ -478,14 +497,24 @@ function setupChannel(target, channel) {
478497
});
479498
});
480499

481-
target.send = function(message, handle) {
482-
if (!this.connected)
483-
this.emit('error', new Error('channel closed'));
484-
else
485-
this._send(message, handle, false);
500+
target.send = function(message, handle, callback) {
501+
if (typeof handle === 'function') {
502+
callback = handle;
503+
handle = undefined;
504+
}
505+
if (this.connected) {
506+
this._send(message, handle, false, callback);
507+
return;
508+
}
509+
const ex = new Error('channel closed');
510+
if (typeof callback === 'function') {
511+
process.nextTick(callback, ex);
512+
} else {
513+
this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
514+
}
486515
};
487516

488-
target._send = function(message, handle, swallowErrors) {
517+
target._send = function(message, handle, swallowErrors, callback) {
489518
assert(this.connected || this._channel);
490519

491520
if (message === undefined)
@@ -516,7 +545,11 @@ function setupChannel(target, channel) {
516545

517546
// Queue-up message and handle if we haven't received ACK yet.
518547
if (this._handleQueue) {
519-
this._handleQueue.push({ message: message.msg, handle: handle });
548+
this._handleQueue.push({
549+
callback: callback,
550+
handle: handle,
551+
message: message.msg,
552+
});
520553
return;
521554
}
522555

@@ -538,24 +571,43 @@ function setupChannel(target, channel) {
538571
} else if (this._handleQueue &&
539572
!(message && message.cmd === 'NODE_HANDLE_ACK')) {
540573
// Queue request anyway to avoid out-of-order messages.
541-
this._handleQueue.push({ message: message, handle: null });
574+
this._handleQueue.push({
575+
callback: callback,
576+
handle: null,
577+
message: message,
578+
});
542579
return;
543580
}
544581

545582
var req = new WriteWrap();
546-
req.oncomplete = nop;
583+
req.async = false;
584+
547585
var string = JSON.stringify(message) + '\n';
548586
var err = channel.writeUtf8String(req, string, handle);
549587

550-
if (err) {
551-
if (!swallowErrors)
552-
this.emit('error', errnoException(err, 'write'));
553-
} else if (handle && !this._handleQueue) {
554-
this._handleQueue = [];
555-
}
556-
557-
if (obj && obj.postSend) {
558-
req.oncomplete = obj.postSend.bind(null, handle);
588+
if (err === 0) {
589+
if (handle && !this._handleQueue)
590+
this._handleQueue = [];
591+
req.oncomplete = function() {
592+
if (this.async === true)
593+
control.unref();
594+
if (obj && obj.postSend)
595+
obj.postSend(handle);
596+
if (typeof callback === 'function')
597+
callback(null);
598+
};
599+
if (req.async === true) {
600+
control.ref();
601+
} else {
602+
process.nextTick(function() { req.oncomplete(); });
603+
}
604+
} else if (!swallowErrors) {
605+
const ex = errnoException(err, 'write');
606+
if (typeof callback === 'function') {
607+
process.nextTick(callback, ex);
608+
} else {
609+
this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
610+
}
559611
}
560612

561613
/* If the master is > 2 read() calls behind, please stop sending. */
@@ -616,6 +668,7 @@ function setupChannel(target, channel) {
616668
};
617669

618670
channel.readStart();
671+
return control;
619672
}
620673

621674

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const fork = require('child_process').fork;
5+
6+
if (process.argv[2] === 'child') {
7+
process.send('ok', common.mustCall(function(err) {
8+
assert.strictEqual(err, null);
9+
}));
10+
} else {
11+
const child = fork(process.argv[1], ['child']);
12+
child.on('message', common.mustCall(function(message) {
13+
assert.strictEqual(message, 'ok');
14+
}));
15+
child.on('exit', common.mustCall(function(exitCode, signalCode) {
16+
assert.strictEqual(exitCode, 0);
17+
assert.strictEqual(signalCode, null);
18+
}));
19+
}

0 commit comments

Comments
 (0)