Skip to content

Commit

Permalink
fix: use pump to close request stream (#287)
Browse files Browse the repository at this point in the history
closes #286
  • Loading branch information
fengmk2 authored Jun 1, 2018
1 parent 8087683 commit 6bc31b9
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 47 deletions.
3 changes: 0 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ node_js:
- '4'
- '6'
- '8'
- '9'
- '10'
env:
- AGENT_VERSION=2
Expand All @@ -20,8 +19,6 @@ matrix:
env: AGENT_VERSION=2
- node_js: '8'
env: AGENT_VERSION=2
- node_js: '9'
env: AGENT_VERSION=2
- node_js: '10'
env: AGENT_VERSION=2
before_install:
Expand Down
96 changes: 61 additions & 35 deletions lib/urllib.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var ms = require('humanize-ms');
var statuses = require('statuses');
var contentTypeParser = require('content-type');
var first = require('ee-first');
var pump = require('pump');
var detectProxyAgent = require('./detect_proxy_agent');

var _Promise;
Expand Down Expand Up @@ -583,6 +584,7 @@ exports.requestWithCallback = function requestWithCallback(url, args, callback)
}

var writeStream = args.writeStream;
var isWriteStreamClose = false;

debug('Request#%d %s %s with headers %j, options.path: %s',
reqId, method, url, options.headers, options.path);
Expand Down Expand Up @@ -652,29 +654,47 @@ exports.requestWithCallback = function requestWithCallback(url, args, callback)
res.resume();
// end ths stream first
writeStream.end();
return done(result.error, null, res);
done(result.error, null, res);
return;
}

// you can set consumeWriteStream false that only wait response end
if (args.consumeWriteStream === false) {
res.on('end', done.bind(null, null, null, res));
} else {
// node 0.10, 0.12: only emit res aborted, writeStream close not fired
if (isNode010 || isNode012) {
first([
[ writeStream, 'close' ],
[ res, 'aborted' ],
], function(_, stream, event) {
debug('Request#%d %s: writeStream or res %s event emitted', reqId, url, event);
done(__err || null, null, res);
});
} else {
writeStream.on('close', function() {
debug('Request#%d %s: writeStream close event emitted', reqId, url);
done(__err || null, null, res);
});
}
pump(res, writeStream, function(err) {
if (isWriteStreamClose) {
return;
}
isWriteStreamClose = true;
debug('Request#%d %s: writeStream close, error: %s', reqId, url, err);
});
return;
}
return res.pipe(writeStream);

// node 0.10, 0.12: only emit res aborted, writeStream close not fired
if (isNode010 || isNode012) {
first([
[ writeStream, 'close' ],
[ res, 'aborted' ],
], function(_, stream, event) {
debug('Request#%d %s: writeStream or res %s event emitted', reqId, url, event);
done(__err || null, null, res);
});
res.pipe(writeStream);
return;
}

debug('Request#%d %s: pump res to writeStream', reqId, url);
pump(res, writeStream, function(err) {
debug('Request#%d %s: writeStream close event emitted, error: %s, isWriteStreamClose: %s',
reqId, url, err, isWriteStreamClose);
if (isWriteStreamClose) {
return;
}
isWriteStreamClose = true;
done(__err || err, null, res);
});
return;
}

// Otherwise, just concat those buffers.
Expand Down Expand Up @@ -835,7 +855,13 @@ exports.requestWithCallback = function requestWithCallback(url, args, callback)
startConnectTimer();
}

var isRequestAborted = false;
function abortRequest() {
if (isRequestAborted) {
return;
}
isRequestAborted = true;

debug('Request#%d %s abort, connected: %s', reqId, url, connected);
// it wont case error event when req haven't been assigned a socket yet.
if (!req.socket) {
Expand Down Expand Up @@ -909,33 +935,33 @@ exports.requestWithCallback = function requestWithCallback(url, args, callback)
startResposneTimer();
});

req.on('error', function (err) {
if (err.name === 'Error') {
err.name = connected ? 'ResponseError' : 'RequestError';
}
err.message += ' (req "error")';
debug('Request#%d %s `req error` event emit, %s: %s', reqId, url, err.name, err.message);
done(__err || err);
});

if (writeStream) {
writeStream.once('error', function (err) {
writeStream.once('error', function(err) {
err.message += ' (writeStream "error")';
__err = err;
debug('Request#%d %s `writeStream error` event emit, %s: %s', reqId, url, err.name, err.message);
abortRequest();
});
}

var isRequestError = false;
function handleRequestError(err) {
if (isRequestError || !err) {
return;
}
isRequestError = true;

if (err.name === 'Error') {
err.name = connected ? 'ResponseError' : 'RequestError';
}
debug('Request#%d %s `req error` event emit, %s: %s', reqId, url, err.name, err.message);
done(__err || err);
}
if (args.stream) {
args.stream.pipe(req);
args.stream.once('error', function (err) {
err.message += ' (stream "error")';
__err = err;
debug('Request#%d %s `readStream error` event emit, %s: %s', reqId, url, err.name, err.message);
abortRequest();
});
debug('Request#%d pump args.stream to req', reqId);
pump(args.stream, req, handleRequestError);
} else {
req.on('error', handleRequestError);
req.end(body);
}

Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"iconv-lite": "^0.4.15",
"ip": "^1.1.5",
"proxy-agent": "^2.1.0",
"pump": "^3.0.0",
"qs": "^6.4.0",
"statuses": "^1.3.1",
"utility": "^1.12.0"
Expand Down
2 changes: 1 addition & 1 deletion test/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module.exports = process.env.CI ? {
// npmjs.com do not support gzip now
npmWeb: 'https://cnpmjs.org',
npmRegistry: 'https://registry.npmjs.com',
npmHttpRegistry: 'http://registry.npmjs.com',
npmHttpRegistry: 'http://registry.npm.taobao.org',
} : {
npmWeb: 'https://npm.taobao.org',
npmRegistry: 'https://registry.npm.taobao.org',
Expand Down
9 changes: 9 additions & 0 deletions test/fixtures/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ var server = http.createServer(function (req, res) {
return;
}

if (req.url === '/block') {
return;
}

var chunks = [];
var size = 0;
req.on('data', function (buf) {
Expand All @@ -63,6 +67,11 @@ var server = http.createServer(function (req, res) {
return setTimeout(function () {
res.end('timeout 700ms');
}, 700);
} else if (req.url === '/response_timeout_10s') {
res.write('foo');
return setTimeout(function () {
res.end('timeout 10000ms');
}, 10000);
} else if (req.url === '/error') {
return res.destroy();
} else if (req.url === '/socket.destroy') {
Expand Down
5 changes: 3 additions & 2 deletions test/proxy.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var proxy = require('./fixtures/reverse-proxy');
var isNode010 = /^v0\.10\.\d+$/.test(process.version);
var isNode012 = /^v0\.12\.\d+$/.test(process.version);

var testUrl = process.env.CI ? 'https://registry.npmjs.org' : 'https://registry.cnpmjs.org';
var testUrl = process.env.CI ? 'https://registry.npmjs.com' : 'https://r.cnpmjs.org';

if (!isNode010 && !isNode012) {
describe('test/proxy.test.js', function() {
Expand All @@ -25,12 +25,13 @@ if (!isNode010 && !isNode012) {
});

it('should proxy http work', function(done) {
urllib.request(testUrl.replace('https', 'http') + '/pedding/latest', {
urllib.request('http://registry.npm.taobao.org/pedding/1.0.0', {
dataType: 'json',
enableProxy: true,
proxy: proxyUrl,
}, function(err, data, res) {
assert(!err);
console.log(res.headers);
assert(data.name === 'pedding');
assert(res.status === 200);
done();
Expand Down
104 changes: 104 additions & 0 deletions test/request-with-stream.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'use strict';

var assert = require('assert');
var fs = require('fs');
var path = require('path');
var urllib = require('..');
var server = require('./fixtures/server');

describe('request-with-stream.test.js', function() {
var isNode012 = /^v0\.12\.\d+$/.test(process.version);
if (isNode012) {
return;
}

var app;
var port;
before(function (done) {
app = server.listen(0, function () {
port = app.address().port;
done();
});
});
after(function() {
app.close();
});

it('should close stream when request timeout', function(done) {
var tmpfile = path.join(__dirname, '.tmp.txt');
var buf = Buffer.alloc && Buffer.alloc(10 * 1024 * 1024) || new Buffer(10 * 1024 * 1024);
fs.writeFileSync(tmpfile, buf);
var stream = fs.createReadStream(tmpfile);
var args = {
method: 'POST',
stream: stream,
timeout: 1000,
};
var streamClosed = false;
stream.on('close', function() {
streamClosed = true;
console.log('stream close fired');
});
urllib.request('http://localhost:' + port + '/block', args, function(err, res) {
assert(err);
assert(err.name === 'ResponseTimeoutError');
assert(err.message.indexOf('timeout for 1000ms') > 0);
assert(!res);
setTimeout(function() {
assert(streamClosed);
done();
}, 100);
});
});

it('should close writeStream when request timeout', function(done) {
var tmpfile = path.join(__dirname, '.tmp.txt');
var writeStream = fs.createWriteStream(tmpfile);
var args = {
method: 'POST',
writeStream: writeStream,
timeout: 1000,
};
var streamClosed = false;
writeStream.on('close', function() {
streamClosed = true;
console.log('writeStream close fired');
});
urllib.request('http://localhost:' + port + '/response_timeout_10s', args, function(err, res) {
assert(err);
assert(err.name === 'ResponseTimeoutError');
assert(err.message.indexOf('timeout for 1000ms') > 0);
assert(!res);
setTimeout(function() {
assert(streamClosed);
done();
}, 100);
});
});

it('should handle writeStream when writeStream emit error', function(done) {
var tmpfile = path.join(__dirname, '.tmpnotexists/.tmp.txt.notexists');
var writeStream = fs.createWriteStream(tmpfile);
var args = {
method: 'POST',
writeStream: writeStream,
timeout: 1000,
};
var streamError = false;
writeStream.on('error', function(err) {
streamError = true;
console.log('writeStream error fired: %s', err);
});
urllib.request('http://localhost:' + port + '/response_timeout_10s', args, function(err, res) {
assert(err);
assert(err.name === 'Error');
assert(err.code === 'ENOENT');
assert(err.message.indexOf('ENOENT: no such file or directory, open') === 0);
assert(!res);
setTimeout(function() {
assert(streamError);
done();
}, 100);
});
});
});
13 changes: 7 additions & 6 deletions test/urllib.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('test/urllib.test.js', function () {
describe('request()', function () {

it('should request(host-only) work', function(done) {
var host = urlutil.parse(config.npmRegistry).host;
var host = urlutil.parse('http://r.cnpmjs.org').host;
urllib.request(host, { timeout: 30000 }, function(err, data, res) {
assert(!err);
assert(data instanceof Buffer);
Expand Down Expand Up @@ -185,10 +185,11 @@ describe('test/urllib.test.js', function () {
});

it('should alias curl() work', function (done) {
urllib.curl(config.npmHttpRegistry + '/pedding/latest', {timeout: 25000},
urllib.curl(config.npmRegistry + '/pedding/1.0.0', {timeout: 25000},
function (err, data, res) {
assert(!err);
assert(Buffer.isBuffer(data));
console.log(res.headers);
assert(res.statusCode === 200);
done();
});
Expand Down Expand Up @@ -367,7 +368,7 @@ describe('test/urllib.test.js', function () {
assert(err);
assert(err.name === 'ResponseError');
err.code && assert(err.code === 'HPE_INVALID_CHUNK_SIZE');
assert(err.message.indexOf('Parse Error (req "error"), GET http://127.0.0.1:') >= 0);
assert(err.message.indexOf('Parse Error, GET http://127.0.0.1:') >= 0);
assert(err.bytesParsed === 2);
assert(!data);
done();
Expand Down Expand Up @@ -738,12 +739,12 @@ describe('test/urllib.test.js', function () {
var urls = [
config.npmRegistry + '/byte',
config.npmWeb,
config.npmHttpRegistry + '/pedding',
config.npmRegistry + '/pedding',

config.npmWeb + '/package/byte',
config.npmRegistry + '/pedding',
config.npmWeb + '/package/pedding',
config.npmHttpRegistry + '/byte',
config.npmRegistry + '/byte',
];

urls.forEach(function (url, index) {
Expand Down Expand Up @@ -772,7 +773,7 @@ describe('test/urllib.test.js', function () {
it('should request http timeout', function (done) {
var agent = this.agent;
var httpsAgent = this.httpsAgent;
urllib.request(config.npmHttpRegistry + '/byte', {
urllib.request(config.npmHttpRegistry + '/byte/2.0.0', {
agent: agent,
httpsAgent: httpsAgent,
timeout: 25000,
Expand Down

0 comments on commit 6bc31b9

Please sign in to comment.