Skip to content

Commit 3d3709b

Browse files
committed
...
1 parent b6444ee commit 3d3709b

File tree

2 files changed

+74
-56
lines changed

2 files changed

+74
-56
lines changed

lib/_stream_wrap.js

+60-35
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const util = require('util');
55
const Socket = require('net').Socket;
66
const JSStream = process.binding('js_stream').JSStream;
77
const uv = process.binding('uv');
8+
const debug = util.debuglog('stream_wrap');
89

910
function StreamWrap(stream) {
1011
const handle = new JSStream();
@@ -15,6 +16,7 @@ function StreamWrap(stream) {
1516

1617
const self = this;
1718
handle.close = function(cb) {
19+
debug('close');
1820
self.doClose(cb);
1921
};
2022
handle.isAlive = function() {
@@ -40,18 +42,23 @@ function StreamWrap(stream) {
4042
this.stream.on('error', function(err) {
4143
self.emit('error', err);
4244
});
43-
44-
Socket.call(this, {
45-
handle: handle
46-
});
47-
4845
this.stream.on('data', function(chunk) {
49-
if (self._handle)
50-
self._handle.readBuffer(chunk);
46+
setImmediate(function() {
47+
debug('data', chunk.length);
48+
if (self._handle)
49+
self._handle.readBuffer(chunk);
50+
});
5151
});
5252
this.stream.once('end', function() {
53-
if (self._handle)
54-
self._handle.emitEOF();
53+
setImmediate(function() {
54+
debug('end');
55+
if (self._handle)
56+
self._handle.emitEOF();
57+
});
58+
});
59+
60+
Socket.call(this, {
61+
handle: handle
5562
});
5663
}
5764
util.inherits(StreamWrap, Socket);
@@ -61,11 +68,11 @@ module.exports = StreamWrap;
6168
StreamWrap.StreamWrap = StreamWrap;
6269

6370
StreamWrap.prototype.isAlive = function isAlive() {
64-
return this.readable && this.writable;
71+
return true;
6572
};
6673

6774
StreamWrap.prototype.isClosing = function isClosing() {
68-
return !this.isAlive();
75+
return !this.readable || !this.writable;
6976
};
7077

7178
StreamWrap.prototype.readStart = function readStart() {
@@ -79,11 +86,16 @@ StreamWrap.prototype.readStop = function readStop() {
7986
};
8087

8188
StreamWrap.prototype.doShutdown = function doShutdown(req) {
89+
const self = this;
8290
const handle = this._handle;
91+
const item = this._enqueue('shutdown', req);
8392

8493
this.stream.end(function() {
8594
// Ensure that write was dispatched
8695
setImmediate(function() {
96+
if (!self._dequeue(item))
97+
return;
98+
8799
handle.finishShutdown(req, 0);
88100
});
89101
});
@@ -97,7 +109,7 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
97109
var pending = bufs.length;
98110

99111
// Queue the request to be able to cancel it
100-
self._enqueue(req);
112+
const item = self._enqueue('write', req);
101113

102114
self.stream.cork();
103115
bufs.forEach(function(buf) {
@@ -115,7 +127,7 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
115127
// Ensure that write was dispatched
116128
setImmediate(function() {
117129
// Do not invoke callback twice
118-
if (!self._dequeue(req))
130+
if (!self._dequeue(item))
119131
return;
120132

121133
var errCode = 0;
@@ -134,39 +146,47 @@ StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
134146
return 0;
135147
};
136148

137-
StreamWrap.prototype._enqueue = function enqueue(req) {
149+
function QueueItem(type, req) {
150+
this.type = type;
151+
this.req = req;
152+
this.prev = this;
153+
this.next = this;
154+
}
155+
156+
StreamWrap.prototype._enqueue = function enqueue(type, req) {
157+
const item = new QueueItem(type, req);
138158
if (this._queue === null) {
139-
this._queue = req;
140-
req._prev = req;
141-
req._next = req;
142-
return;
159+
this._queue = item;
160+
return item;
143161
}
144162

145-
req._next = this._queue._next;
146-
req._prev = this._queue;
147-
req._next._prev = req;
148-
req._prev._next = req;
163+
item.next = this._queue.next;
164+
item.prev = this._queue;
165+
item.next.prev = item;
166+
item.prev.next = item;
167+
168+
return item;
149169
};
150170

151-
StreamWrap.prototype._dequeue = function dequeue(req) {
152-
var next = req._next;
153-
var prev = req._prev;
171+
StreamWrap.prototype._dequeue = function dequeue(item) {
172+
var next = item.next;
173+
var prev = item.prev;
154174

155175
if (next === null && prev === null)
156176
return false;
157177

158-
req._next = null;
159-
req._prev = null;
178+
item.next = null;
179+
item.prev = null;
160180

161-
if (next === req) {
181+
if (next === item) {
162182
prev = null;
163183
next = null;
164184
} else {
165-
prev._next = next;
166-
next._prev = prev;
185+
prev.next = next;
186+
next.prev = prev;
167187
}
168188

169-
if (this._queue === req)
189+
if (this._queue === item)
170190
this._queue = next;
171191

172192
return true;
@@ -178,12 +198,17 @@ StreamWrap.prototype.doClose = function doClose(cb) {
178198

179199
setImmediate(function() {
180200
while (self._queue !== null) {
181-
const req = self._queue;
182-
self._dequeue(req);
201+
const item = self._queue;
202+
const req = item.req;
203+
self._dequeue(item);
183204

184205
const errCode = uv.UV_ECANCELED;
185-
handle.doAfterWrite(req);
186-
handle.finishWrite(req, errCode);
206+
if (item.type === 'write') {
207+
handle.doAfterWrite(req);
208+
handle.finishWrite(req, errCode);
209+
} else if (item.type === 'shutdown') {
210+
handle.finishShutdown(req, errCode);
211+
}
187212
}
188213

189214
// Should be already set by net.js

test/parallel/test-stream-wrap.js

+14-21
Original file line numberDiff line numberDiff line change
@@ -6,41 +6,34 @@ const StreamWrap = require('_stream_wrap');
66
const Duplex = require('stream').Duplex;
77
const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap;
88

9+
var done = false;
10+
911
function testShutdown(callback) {
1012
var stream = new Duplex({
1113
read: function() {
1214
},
13-
write: function(data, enc, callback) {
14-
callback(null);
15+
write: function() {
1516
}
1617
});
1718

1819
var wrap = new StreamWrap(stream);
1920

2021
var req = new ShutdownWrap();
21-
req.oncomplete = function() {};
22+
req.oncomplete = function(code) {
23+
assert(code < 0);
24+
callback();
25+
};
2226
req.handle = wrap._handle;
23-
wrap._handle.shutdown(req);
2427

28+
// Close the handle to simulate
2529
wrap.destroy();
26-
27-
process.nextTick(callback);
28-
}
29-
30-
function testReadAfterClose(callback) {
31-
var stream = new Duplex({
32-
read: function() {
33-
},
34-
write: function(data, enc, callback) {
35-
callback(null);
36-
}
37-
});
38-
stream.push('data');
39-
stream.push(null);
40-
41-
var wrap = new StreamWrap(stream);
30+
req.handle.shutdown(req);
4231
}
4332

4433
testShutdown(function() {
45-
testReadAfterClose();
34+
done = true;
35+
});
36+
37+
process.on('exit', function() {
38+
assert(done);
4639
});

0 commit comments

Comments
 (0)