Skip to content

Commit

Permalink
Fix socket read timeout (#1507)
Browse files Browse the repository at this point in the history
* Nodejs socket read timeouts now honored even after response headers received.

* Updates shaky-stream mock to use new Buffer

* Changes update type to more accurately reflect a bug fix
  • Loading branch information
chrisradek authored May 11, 2017
1 parent 2c1b94e commit 8904e9c
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 22 deletions.
5 changes: 5 additions & 0 deletions .changes/next-release/feature-Request-97402bff.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "bugfix",
"category": "Request",
"description": "Updates node.js request handling to obey socket read timeouts after response headers have been received. Previously timeouts were being ignored once headers were received, sometimes causing connections to hang."
}
7 changes: 5 additions & 2 deletions lib/event_listeners.js
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ AWS.EventListeners = {

function callback(httpResp) {
resp.httpResponse.stream = httpResp;
var stream = resp.request.httpRequest.stream;

httpResp.on('headers', function onHeaders(statusCode, headers, statusMessage) {
resp.request.emit(
Expand All @@ -250,8 +251,10 @@ AWS.EventListeners = {
});

httpResp.on('end', function onEnd() {
resp.request.emit('httpDone');
done();
if (!stream || !stream.didCallback) {
resp.request.emit('httpDone');
done();
}
});
}

Expand Down
12 changes: 6 additions & 6 deletions lib/http/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ require('../http');
AWS.NodeHttpClient = AWS.util.inherit({
handleRequest: function handleRequest(httpRequest, httpOptions, callback, errCallback) {
var self = this;
var cbAlreadyCalled = false;
var endpoint = httpRequest.endpoint;
var pathPrefix = '';
if (!httpOptions) httpOptions = {};
Expand Down Expand Up @@ -41,7 +40,7 @@ AWS.NodeHttpClient = AWS.util.inherit({
delete options.timeout; // timeout isn't an HTTP option

var stream = http.request(options, function (httpResp) {
if (cbAlreadyCalled) return; cbAlreadyCalled = true;
if (stream.didCallback) return;

callback(httpResp);
httpResp.emit(
Expand All @@ -52,13 +51,14 @@ AWS.NodeHttpClient = AWS.util.inherit({
);
});
httpRequest.stream = stream; // attach stream to httpRequest
stream.didCallback = false;

// connection timeout support
if (httpOptions.connectTimeout) {
stream.on('socket', function(socket) {
if (socket.connecting) {
var timeoutId = setTimeout(function connectTimeout() {
if (cbAlreadyCalled) return; cbAlreadyCalled = true;
if (stream.didCallback) return; stream.didCallback = true;

stream.abort();
errCallback(AWS.util.error(
Expand All @@ -77,16 +77,16 @@ AWS.NodeHttpClient = AWS.util.inherit({

// timeout support
stream.setTimeout(httpOptions.timeout || 0, function() {
if (cbAlreadyCalled) return; cbAlreadyCalled = true;
if (stream.didCallback) return; stream.didCallback = true;

var msg = 'Connection timed out after ' + httpOptions.timeout + 'ms';
errCallback(AWS.util.error(new Error(msg), {code: 'TimeoutError'}));
stream.abort();
});

stream.on('error', function() {
if (cbAlreadyCalled) return; cbAlreadyCalled = true;
errCallback.apply(this, arguments);
if (stream.didCallback) return; stream.didCallback = true;
errCallback.apply(stream, arguments);
});

var expect = httpRequest.headers.Expect || httpRequest.headers.expect;
Expand Down
9 changes: 7 additions & 2 deletions lib/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,9 @@ AWS.Request = inherit({
} else if (AWS.HttpClient.streamsApiVersion === 2) {
stream.end();
} else {
stream.emit('end')
stream.emit('end');
}
}
};

var httpStream = resp.httpResponse.createUnbufferedStream();

Expand All @@ -629,6 +629,11 @@ AWS.Request = inherit({
};

lengthAccumulator.on('end', checkContentLengthAndEmit);
stream.on('error', function(err) {
shouldCheckContentLength = false;
lengthAccumulator.emit('end');
lengthAccumulator.end();
});
httpStream.pipe(lengthAccumulator).pipe(stream, { end: false });
} else {
httpStream.pipe(stream);
Expand Down
45 changes: 45 additions & 0 deletions test/mocks/shaky-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
var stream = require('stream');
var util = require('util');

var Readable = stream.Readable;

var timeoutFn = typeof setTimeoutOrig === 'function' ? setTimeoutOrig : setTimeout;

/**
* ShakyStream will send data in 2 parts, pausing between parts.
*/
function ShakyStream(options) {
if (!(this instanceof ShakyStream)) {
return new ShakyStream(options);
}
if (!options.highWaterMark) {
options.highWaterMark = 1024 * 16;
}
this._shakyTime = options.pauseFor;
this._didStart = false;
this._isPaused = false;

Readable.call(this, options);

}

util.inherits(ShakyStream, Readable);

ShakyStream.prototype._read = function _read(size) {
if (!this._didStart) {
this._didStart = true;
this.push(new Buffer('{"Count":1,"Items":[{"id":{"S":"2016-12-11"},"dateUTC":{"N":"1481494545591"},'));
}
if (this._didStart && this._isPaused) {
return;
} else if (this._didStart) {
this._isPaused = true;
var self = this;
timeoutFn(function() {
self.push(new Buffer('"javascript":{"M":{"foo":{"S":"bar"},"baz":{"S":"buz"}}}}],"ScannedCount":1}'));
self.push(null);
}, this._shakyTime);
}
};

module.exports = ShakyStream;
52 changes: 51 additions & 1 deletion test/node_http_client.spec.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 9 additions & 11 deletions test/request.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ describe('AWS.Request', function() {
'content-length': '12'
});
resp.write('FOOBARBAZQUX');
return resp.end();
resp.end();
};
request = service.makeRequest('mockMethod');
s = request.createReadStream();
Expand All @@ -708,9 +708,9 @@ describe('AWS.Request', function() {
expect(error).to.be["null"];
});
s.on('data', function(c) {
return data += c.toString();
data += c.toString();
});
return s.on('end', function() {
s.on('end', function() {
expect(error).to.be["null"];
expect(data).to.equal('FOOBARBAZQUX');
done();
Expand Down Expand Up @@ -744,7 +744,7 @@ describe('AWS.Request', function() {
});
});

it('only accepts data up to the specified content-length', function(done) {
it('errors if data received exceeds content-length', function(done) {
var request, s;
AWS.HttpClient.streamsApiVersion = 1;
app = function(req, resp) {
Expand All @@ -758,14 +758,13 @@ describe('AWS.Request', function() {
s = request.createReadStream();
s.on('error', function(e) {
error = e;
expect(error).to.be["null"];
expect(error).to.not.be["null"];
});
s.on('data', function(c) {
return data += c.toString();
});
return s.on('end', function() {
expect(error).to.be["null"];
expect(data).to.equal('FOOBARBAZQU');
expect(error).to.not.be["null"];
done();
});
});
Expand Down Expand Up @@ -875,7 +874,7 @@ describe('AWS.Request', function() {
});
});

it('only accepts data up to the specified content-length', function(done) {
it('errors if data received exceeds the content-length (streams2)', function(done) {
var request, s;
if (AWS.HttpClient.streamsApiVersion < 2) {
done();
Expand All @@ -891,11 +890,10 @@ describe('AWS.Request', function() {
s = request.createReadStream();
s.on('error', function(e) {
error = e;
expect(error).to.be["null"];
expect(error).to.not.be["null"];
});
s.on('end', function() {
expect(error).to.be["null"];
expect(data).to.equal('FOOBARBAZQU');
expect(error).to.not.be["null"];
done();
});
return s.on('readable', function() {
Expand Down

0 comments on commit 8904e9c

Please sign in to comment.