Skip to content

Commit 1738c77

Browse files
committed
streams: introduce StreamWrap and JSStream
Introduce a way to wrap plain-js `stream.Duplex` streams into C++ StreamBase's child class. With such method at hand it is now possible to pass `stream.Duplex` instance as a `socket` parameter to `tls.connect()`. PR-URL: #926 Reviewed-By: Chris Dickinson <christopher.s.dickinson@gmail.com>
1 parent e00c938 commit 1738c77

15 files changed

+498
-52
lines changed

lib/_stream_wrap.js

+118
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
const util = require('util');
2+
const Socket = require('net').Socket;
3+
const JSStream = process.binding('js_stream').JSStream;
4+
const uv = process.binding('uv');
5+
6+
function StreamWrap(stream) {
7+
var handle = new JSStream();
8+
9+
this.stream = stream;
10+
11+
var self = this;
12+
handle.close = function(cb) {
13+
cb();
14+
};
15+
handle.isAlive = function() {
16+
return self.isAlive();
17+
};
18+
handle.isClosing = function() {
19+
return self.isClosing();
20+
};
21+
handle.onreadstart = function() {
22+
return self.readStart();
23+
};
24+
handle.onreadstop = function() {
25+
return self.readStop();
26+
};
27+
handle.onshutdown = function(req) {
28+
return self.shutdown(req);
29+
};
30+
handle.onwrite = function(req, bufs) {
31+
return self.write(req, bufs);
32+
};
33+
34+
this.stream.pause();
35+
this.stream.on('data', function(chunk) {
36+
self._handle.readBuffer(chunk);
37+
});
38+
this.stream.once('end', function() {
39+
self._handle.emitEOF();
40+
});
41+
this.stream.on('error', function(err) {
42+
self.emit('error', err);
43+
});
44+
45+
Socket.call(this, {
46+
handle: handle
47+
});
48+
}
49+
util.inherits(StreamWrap, Socket);
50+
module.exports = StreamWrap;
51+
52+
// require('_stream_wrap').StreamWrap
53+
StreamWrap.StreamWrap = StreamWrap;
54+
55+
StreamWrap.prototype.isAlive = function isAlive() {
56+
return this.readable && this.writable;
57+
};
58+
59+
StreamWrap.prototype.isClosing = function isClosing() {
60+
return !this.isAlive();
61+
};
62+
63+
StreamWrap.prototype.readStart = function readStart() {
64+
this.stream.resume();
65+
return 0;
66+
};
67+
68+
StreamWrap.prototype.readStop = function readStop() {
69+
this.stream.pause();
70+
return 0;
71+
};
72+
73+
StreamWrap.prototype.shutdown = function shutdown(req) {
74+
var self = this;
75+
76+
this.stream.end(function() {
77+
// Ensure that write was dispatched
78+
setImmediate(function() {
79+
self._handle.finishShutdown(req, 0);
80+
});
81+
});
82+
return 0;
83+
};
84+
85+
StreamWrap.prototype.write = function write(req, bufs) {
86+
var pending = bufs.length;
87+
var self = this;
88+
89+
self.stream.cork();
90+
bufs.forEach(function(buf) {
91+
self.stream.write(buf, done);
92+
});
93+
self.stream.uncork();
94+
95+
function done(err) {
96+
if (!err && --pending !== 0)
97+
return;
98+
99+
// Ensure that this is called once in case of error
100+
pending = 0;
101+
102+
// Ensure that write was dispatched
103+
setImmediate(function() {
104+
var errCode = 0;
105+
if (err) {
106+
if (err.code && uv['UV_' + err.code])
107+
errCode = uv['UV_' + err.code];
108+
else
109+
errCode = uv.UV_EPIPE;
110+
}
111+
112+
self._handle.doAfterWrite(req);
113+
self._handle.finishWrite(req, errCode);
114+
});
115+
}
116+
117+
return 0;
118+
};

lib/_tls_wrap.js

+13-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ const tls = require('tls');
77
const util = require('util');
88
const listenerCount = require('events').listenerCount;
99
const common = require('_tls_common');
10+
const StreamWrap = require('_stream_wrap').StreamWrap;
11+
const Duplex = require('stream').Duplex;
1012
const debug = util.debuglog('tls');
1113
const Timer = process.binding('timer_wrap').Timer;
1214
const tls_wrap = process.binding('tls_wrap');
@@ -224,6 +226,10 @@ function TLSSocket(socket, options) {
224226
this.authorized = false;
225227
this.authorizationError = null;
226228

229+
// Wrap plain JS Stream into StreamWrap
230+
if (!(socket instanceof net.Socket) && socket instanceof Duplex)
231+
socket = new StreamWrap(socket);
232+
227233
// Just a documented property to make secure sockets
228234
// distinguishable from regular ones.
229235
this.encrypted = true;
@@ -280,7 +286,8 @@ TLSSocket.prototype._wrapHandle = function(handle) {
280286
// Proxy HandleWrap, PipeWrap and TCPWrap methods
281287
proxiedMethods.forEach(function(name) {
282288
res[name] = function methodProxy() {
283-
return handle[name].apply(handle, arguments);
289+
if (handle[name])
290+
return handle[name].apply(handle, arguments);
284291
};
285292
});
286293

@@ -373,7 +380,7 @@ TLSSocket.prototype._init = function(socket) {
373380
this.setTimeout(options.handshakeTimeout, this._handleTimeout);
374381

375382
// Socket already has some buffered data - emulate receiving it
376-
if (socket && socket._readableState.length) {
383+
if (socket && socket._readableState && socket._readableState.length) {
377384
var buf;
378385
while ((buf = socket.read()) !== null)
379386
ssl.receive(buf);
@@ -388,6 +395,10 @@ TLSSocket.prototype._init = function(socket) {
388395
self._connecting = false;
389396
self.emit('connect');
390397
});
398+
399+
socket.on('error', function(err) {
400+
self._tlsError(err);
401+
});
391402
}
392403

393404
// Assume `tls.connect()`

node.gyp

+3
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
'lib/_stream_duplex.js',
5757
'lib/_stream_transform.js',
5858
'lib/_stream_passthrough.js',
59+
'lib/_stream_wrap.js',
5960
'lib/string_decoder.js',
6061
'lib/sys.js',
6162
'lib/timers.js',
@@ -95,6 +96,7 @@
9596
'src/fs_event_wrap.cc',
9697
'src/cares_wrap.cc',
9798
'src/handle_wrap.cc',
99+
'src/js_stream.cc',
98100
'src/node.cc',
99101
'src/node_buffer.cc',
100102
'src/node_constants.cc',
@@ -132,6 +134,7 @@
132134
'src/env.h',
133135
'src/env-inl.h',
134136
'src/handle_wrap.h',
137+
'src/js_stream.h',
135138
'src/node.h',
136139
'src/node_buffer.h',
137140
'src/node_constants.h',

src/async-wrap.h

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ namespace node {
1717
V(FSREQWRAP) \
1818
V(GETADDRINFOREQWRAP) \
1919
V(GETNAMEINFOREQWRAP) \
20+
V(JSSTREAM) \
2021
V(PIPEWRAP) \
2122
V(PROCESSWRAP) \
2223
V(QUERYWRAP) \

src/env.h

+7
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ namespace node {
107107
V(ipv4_string, "IPv4") \
108108
V(ipv6_lc_string, "ipv6") \
109109
V(ipv6_string, "IPv6") \
110+
V(isalive_string, "isAlive") \
111+
V(isclosing_string, "isClosing") \
110112
V(issuer_string, "issuer") \
111113
V(issuercert_string, "issuerCertificate") \
112114
V(kill_signal_string, "killSignal") \
@@ -141,9 +143,13 @@ namespace node {
141143
V(onnewsessiondone_string, "onnewsessiondone") \
142144
V(onocspresponse_string, "onocspresponse") \
143145
V(onread_string, "onread") \
146+
V(onreadstart_string, "onreadstart") \
147+
V(onreadstop_string, "onreadstop") \
144148
V(onselect_string, "onselect") \
149+
V(onshutdown_string, "onshutdown") \
145150
V(onsignal_string, "onsignal") \
146151
V(onstop_string, "onstop") \
152+
V(onwrite_string, "onwrite") \
147153
V(output_string, "output") \
148154
V(order_string, "order") \
149155
V(owner_string, "owner") \
@@ -225,6 +231,7 @@ namespace node {
225231
V(context, v8::Context) \
226232
V(domain_array, v8::Array) \
227233
V(fs_stats_constructor_function, v8::Function) \
234+
V(jsstream_constructor_template, v8::FunctionTemplate) \
228235
V(module_load_list_array, v8::Array) \
229236
V(pipe_constructor_template, v8::FunctionTemplate) \
230237
V(process_object, v8::Object) \

0 commit comments

Comments
 (0)