Skip to content

Commit 87fd5b7

Browse files
addaleaxgibfahn
authored andcommitted
lib: move _stream_wrap into internals
This makes a subsequent possible deprecation easier. PR-URL: #16158 Backport-PR-URL: #16626 Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com> Reviewed-By: Tobias Nießen <tniessen@tnie.de>
1 parent 9bea207 commit 87fd5b7

File tree

3 files changed

+229
-225
lines changed

3 files changed

+229
-225
lines changed

lib/_stream_wrap.js

+1-225
Original file line numberDiff line numberDiff line change
@@ -1,227 +1,3 @@
11
'use strict';
22

3-
const assert = require('assert');
4-
const util = require('util');
5-
// TODO(bmeurer): Change this back to const once hole checks are
6-
// properly optimized away early in Ignition+TurboFan.
7-
var Buffer = require('buffer').Buffer;
8-
const { Socket } = require('net');
9-
const { JSStream } = process.binding('js_stream');
10-
const uv = process.binding('uv');
11-
const debug = util.debuglog('stream_wrap');
12-
13-
function StreamWrap(stream) {
14-
const handle = new JSStream();
15-
16-
this.stream = stream;
17-
18-
this._list = null;
19-
20-
const self = this;
21-
handle.close = function(cb) {
22-
debug('close');
23-
self.doClose(cb);
24-
};
25-
handle.isAlive = function() {
26-
return self.isAlive();
27-
};
28-
handle.isClosing = function() {
29-
return self.isClosing();
30-
};
31-
handle.onreadstart = function() {
32-
return self.readStart();
33-
};
34-
handle.onreadstop = function() {
35-
return self.readStop();
36-
};
37-
handle.onshutdown = function(req) {
38-
return self.doShutdown(req);
39-
};
40-
handle.onwrite = function(req, bufs) {
41-
return self.doWrite(req, bufs);
42-
};
43-
44-
this.stream.pause();
45-
this.stream.on('error', function onerror(err) {
46-
self.emit('error', err);
47-
});
48-
this.stream.on('data', function ondata(chunk) {
49-
if (!(chunk instanceof Buffer)) {
50-
// Make sure that no further `data` events will happen
51-
this.pause();
52-
this.removeListener('data', ondata);
53-
54-
self.emit('error', new Error('Stream has StringDecoder'));
55-
return;
56-
}
57-
58-
debug('data', chunk.length);
59-
if (self._handle)
60-
self._handle.readBuffer(chunk);
61-
});
62-
this.stream.once('end', function onend() {
63-
debug('end');
64-
if (self._handle)
65-
self._handle.emitEOF();
66-
});
67-
68-
Socket.call(this, {
69-
handle: handle
70-
});
71-
}
72-
util.inherits(StreamWrap, Socket);
73-
module.exports = StreamWrap;
74-
75-
// require('_stream_wrap').StreamWrap
76-
StreamWrap.StreamWrap = StreamWrap;
77-
78-
StreamWrap.prototype.isAlive = function isAlive() {
79-
return true;
80-
};
81-
82-
StreamWrap.prototype.isClosing = function isClosing() {
83-
return !this.readable || !this.writable;
84-
};
85-
86-
StreamWrap.prototype.readStart = function readStart() {
87-
this.stream.resume();
88-
return 0;
89-
};
90-
91-
StreamWrap.prototype.readStop = function readStop() {
92-
this.stream.pause();
93-
return 0;
94-
};
95-
96-
StreamWrap.prototype.doShutdown = function doShutdown(req) {
97-
const self = this;
98-
const handle = this._handle;
99-
const item = this._enqueue('shutdown', req);
100-
101-
this.stream.end(function() {
102-
// Ensure that write was dispatched
103-
setImmediate(function() {
104-
if (!self._dequeue(item))
105-
return;
106-
107-
handle.finishShutdown(req, 0);
108-
});
109-
});
110-
return 0;
111-
};
112-
113-
StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
114-
const self = this;
115-
const handle = self._handle;
116-
117-
var pending = bufs.length;
118-
119-
// Queue the request to be able to cancel it
120-
const item = self._enqueue('write', req);
121-
122-
self.stream.cork();
123-
for (var n = 0; n < bufs.length; n++)
124-
self.stream.write(bufs[n], done);
125-
self.stream.uncork();
126-
127-
function done(err) {
128-
if (!err && --pending !== 0)
129-
return;
130-
131-
// Ensure that this is called once in case of error
132-
pending = 0;
133-
134-
// Ensure that write was dispatched
135-
setImmediate(function() {
136-
// Do not invoke callback twice
137-
if (!self._dequeue(item))
138-
return;
139-
140-
var errCode = 0;
141-
if (err) {
142-
if (err.code && uv['UV_' + err.code])
143-
errCode = uv['UV_' + err.code];
144-
else
145-
errCode = uv.UV_EPIPE;
146-
}
147-
148-
handle.doAfterWrite(req);
149-
handle.finishWrite(req, errCode);
150-
});
151-
}
152-
153-
return 0;
154-
};
155-
156-
function QueueItem(type, req) {
157-
this.type = type;
158-
this.req = req;
159-
this.prev = this;
160-
this.next = this;
161-
}
162-
163-
StreamWrap.prototype._enqueue = function _enqueue(type, req) {
164-
const item = new QueueItem(type, req);
165-
if (this._list === null) {
166-
this._list = item;
167-
return item;
168-
}
169-
170-
item.next = this._list.next;
171-
item.prev = this._list;
172-
item.next.prev = item;
173-
item.prev.next = item;
174-
175-
return item;
176-
};
177-
178-
StreamWrap.prototype._dequeue = function _dequeue(item) {
179-
assert(item instanceof QueueItem);
180-
181-
var next = item.next;
182-
var prev = item.prev;
183-
184-
if (next === null && prev === null)
185-
return false;
186-
187-
item.next = null;
188-
item.prev = null;
189-
190-
if (next === item) {
191-
prev = null;
192-
next = null;
193-
} else {
194-
prev.next = next;
195-
next.prev = prev;
196-
}
197-
198-
if (this._list === item)
199-
this._list = next;
200-
201-
return true;
202-
};
203-
204-
StreamWrap.prototype.doClose = function doClose(cb) {
205-
const self = this;
206-
const handle = self._handle;
207-
208-
setImmediate(function() {
209-
while (self._list !== null) {
210-
const item = self._list;
211-
const req = item.req;
212-
self._dequeue(item);
213-
214-
const errCode = uv.UV_ECANCELED;
215-
if (item.type === 'write') {
216-
handle.doAfterWrite(req);
217-
handle.finishWrite(req, errCode);
218-
} else if (item.type === 'shutdown') {
219-
handle.finishShutdown(req, errCode);
220-
}
221-
}
222-
223-
// Should be already set by net.js
224-
assert(self._handle === null);
225-
cb();
226-
});
227-
};
3+
module.exports = require('internal/wrap_js_stream');

0 commit comments

Comments
 (0)