From 8904e9c730fb2fccf9d201f66266a6e2cbb75348 Mon Sep 17 00:00:00 2001 From: Christopher Radek Date: Thu, 11 May 2017 09:35:54 -0700 Subject: [PATCH] Fix socket read timeout (#1507) * Nodejs socket read timeouts now honored even after response headers received. * Updates shaky-stream mock to use new Buffer * Changes update type to more accurately reflect a bug fix --- .../feature-Request-97402bff.json | 5 ++ lib/event_listeners.js | 7 ++- lib/http/node.js | 12 ++--- lib/request.js | 9 +++- test/mocks/shaky-stream.js | 45 ++++++++++++++++ test/node_http_client.spec.js | 52 ++++++++++++++++++- test/request.spec.js | 20 ++++--- 7 files changed, 128 insertions(+), 22 deletions(-) create mode 100644 .changes/next-release/feature-Request-97402bff.json create mode 100644 test/mocks/shaky-stream.js diff --git a/.changes/next-release/feature-Request-97402bff.json b/.changes/next-release/feature-Request-97402bff.json new file mode 100644 index 0000000000..8965163dfb --- /dev/null +++ b/.changes/next-release/feature-Request-97402bff.json @@ -0,0 +1,5 @@ +{ + "type": "bugfix", + "category": "Request", + "description": "Updates node.js request handling to obey socket read timeouts after response headers have been received. Previously timeouts were being ignored once headers were received, sometimes causing connections to hang." +} \ No newline at end of file diff --git a/lib/event_listeners.js b/lib/event_listeners.js index 67b3575275..56337c2c6e 100644 --- a/lib/event_listeners.js +++ b/lib/event_listeners.js @@ -226,6 +226,7 @@ AWS.EventListeners = { function callback(httpResp) { resp.httpResponse.stream = httpResp; + var stream = resp.request.httpRequest.stream; httpResp.on('headers', function onHeaders(statusCode, headers, statusMessage) { resp.request.emit( @@ -250,8 +251,10 @@ AWS.EventListeners = { }); httpResp.on('end', function onEnd() { - resp.request.emit('httpDone'); - done(); + if (!stream || !stream.didCallback) { + resp.request.emit('httpDone'); + done(); + } }); } diff --git a/lib/http/node.js b/lib/http/node.js index 05f218cf4f..a7f142aeed 100644 --- a/lib/http/node.js +++ b/lib/http/node.js @@ -10,7 +10,6 @@ require('../http'); AWS.NodeHttpClient = AWS.util.inherit({ handleRequest: function handleRequest(httpRequest, httpOptions, callback, errCallback) { var self = this; - var cbAlreadyCalled = false; var endpoint = httpRequest.endpoint; var pathPrefix = ''; if (!httpOptions) httpOptions = {}; @@ -41,7 +40,7 @@ AWS.NodeHttpClient = AWS.util.inherit({ delete options.timeout; // timeout isn't an HTTP option var stream = http.request(options, function (httpResp) { - if (cbAlreadyCalled) return; cbAlreadyCalled = true; + if (stream.didCallback) return; callback(httpResp); httpResp.emit( @@ -52,13 +51,14 @@ AWS.NodeHttpClient = AWS.util.inherit({ ); }); httpRequest.stream = stream; // attach stream to httpRequest + stream.didCallback = false; // connection timeout support if (httpOptions.connectTimeout) { stream.on('socket', function(socket) { if (socket.connecting) { var timeoutId = setTimeout(function connectTimeout() { - if (cbAlreadyCalled) return; cbAlreadyCalled = true; + if (stream.didCallback) return; stream.didCallback = true; stream.abort(); errCallback(AWS.util.error( @@ -77,7 +77,7 @@ AWS.NodeHttpClient = AWS.util.inherit({ // timeout support stream.setTimeout(httpOptions.timeout || 0, function() { - if (cbAlreadyCalled) return; cbAlreadyCalled = true; + if (stream.didCallback) return; stream.didCallback = true; var msg = 'Connection timed out after ' + httpOptions.timeout + 'ms'; errCallback(AWS.util.error(new Error(msg), {code: 'TimeoutError'})); @@ -85,8 +85,8 @@ AWS.NodeHttpClient = AWS.util.inherit({ }); stream.on('error', function() { - if (cbAlreadyCalled) return; cbAlreadyCalled = true; - errCallback.apply(this, arguments); + if (stream.didCallback) return; stream.didCallback = true; + errCallback.apply(stream, arguments); }); var expect = httpRequest.headers.Expect || httpRequest.headers.expect; diff --git a/lib/request.js b/lib/request.js index 0918215b45..7f44767a27 100644 --- a/lib/request.js +++ b/lib/request.js @@ -612,9 +612,9 @@ AWS.Request = inherit({ } else if (AWS.HttpClient.streamsApiVersion === 2) { stream.end(); } else { - stream.emit('end') + stream.emit('end'); } - } + }; var httpStream = resp.httpResponse.createUnbufferedStream(); @@ -629,6 +629,11 @@ AWS.Request = inherit({ }; lengthAccumulator.on('end', checkContentLengthAndEmit); + stream.on('error', function(err) { + shouldCheckContentLength = false; + lengthAccumulator.emit('end'); + lengthAccumulator.end(); + }); httpStream.pipe(lengthAccumulator).pipe(stream, { end: false }); } else { httpStream.pipe(stream); diff --git a/test/mocks/shaky-stream.js b/test/mocks/shaky-stream.js new file mode 100644 index 0000000000..be2d324d7d --- /dev/null +++ b/test/mocks/shaky-stream.js @@ -0,0 +1,45 @@ +var stream = require('stream'); +var util = require('util'); + +var Readable = stream.Readable; + +var timeoutFn = typeof setTimeoutOrig === 'function' ? setTimeoutOrig : setTimeout; + +/** + * ShakyStream will send data in 2 parts, pausing between parts. + */ +function ShakyStream(options) { + if (!(this instanceof ShakyStream)) { + return new ShakyStream(options); + } + if (!options.highWaterMark) { + options.highWaterMark = 1024 * 16; + } + this._shakyTime = options.pauseFor; + this._didStart = false; + this._isPaused = false; + + Readable.call(this, options); + +} + +util.inherits(ShakyStream, Readable); + +ShakyStream.prototype._read = function _read(size) { + if (!this._didStart) { + this._didStart = true; + this.push(new Buffer('{"Count":1,"Items":[{"id":{"S":"2016-12-11"},"dateUTC":{"N":"1481494545591"},')); + } + if (this._didStart && this._isPaused) { + return; + } else if (this._didStart) { + this._isPaused = true; + var self = this; + timeoutFn(function() { + self.push(new Buffer('"javascript":{"M":{"foo":{"S":"bar"},"baz":{"S":"buz"}}}}],"ScannedCount":1}')); + self.push(null); + }, this._shakyTime); + } +}; + +module.exports = ShakyStream; \ No newline at end of file diff --git a/test/node_http_client.spec.js b/test/node_http_client.spec.js index 491b8c774a..4bc86cdeae 100644 --- a/test/node_http_client.spec.js +++ b/test/node_http_client.spec.js @@ -3,7 +3,7 @@ var AWS, EventEmitter, helpers, httpModule; helpers = require('./helpers'); - + var ShakyStream = require('./mocks/shaky-stream'); AWS = helpers.AWS; EventEmitter = require('events').EventEmitter; @@ -76,6 +76,56 @@ return expect(numCalls).to.equal(1); }); }); + describe('timeout', function() { + it('is obeyed even after response headers are recieved', function(done) { + // a mock server with 'ShakyStream' allows us to simulate a period of socket inactivity + var server = httpModule.createServer(function(req, res) { + res.setHeader('Content-Type', 'application/json'); + var ss = new ShakyStream({ + pauseFor: 1000 // simulate 1 second pause while receiving data + }); + ss.pipe(res); + }).listen(3334); + var ddb = new AWS.DynamoDB({ + httpOptions: { + timeout: 100 + }, + endpoint: 'http://127.0.0.1:3334' + }); + ddb.scan({ + TableName: 'fake' + }, function(err, data) { + server.close(); + expect(err.name).to.equal('TimeoutError'); + done(); + }); + }); + + it('does not trigger unnecessarily', function(done) { + // a mock server with 'ShakyStream' allows us to simulate a period of socket inactivity + var server = httpModule.createServer(function(req, res) { + res.setHeader('Content-Type', 'application/json'); + var ss = new ShakyStream({ + pauseFor: 100 // simulate 100 ms pause while receiving data + }); + ss.pipe(res); + }).listen(3334); + var ddb = new AWS.DynamoDB({ + httpOptions: { + timeout: 1000 + }, + endpoint: 'http://127.0.0.1:3334' + }); + ddb.scan({ + TableName: 'fake' + }, function(err, data) { + server.close(); + expect(err).to.eql(null); + done(); + }); + }); + }); + return describe('connectTimeout', function() { var clearTimeoutSpy, mockClientRequest, oldClearTimeout, oldRequest, oldSetTimeout, requestSpy, setTimeoutSpy, timeoutId; timeoutId = 'TIMEOUT_ID'; diff --git a/test/request.spec.js b/test/request.spec.js index 0650346249..cdcae4aefd 100644 --- a/test/request.spec.js +++ b/test/request.spec.js @@ -699,7 +699,7 @@ describe('AWS.Request', function() { 'content-length': '12' }); resp.write('FOOBARBAZQUX'); - return resp.end(); + resp.end(); }; request = service.makeRequest('mockMethod'); s = request.createReadStream(); @@ -708,9 +708,9 @@ describe('AWS.Request', function() { expect(error).to.be["null"]; }); s.on('data', function(c) { - return data += c.toString(); + data += c.toString(); }); - return s.on('end', function() { + s.on('end', function() { expect(error).to.be["null"]; expect(data).to.equal('FOOBARBAZQUX'); done(); @@ -744,7 +744,7 @@ describe('AWS.Request', function() { }); }); - it('only accepts data up to the specified content-length', function(done) { + it('errors if data received exceeds content-length', function(done) { var request, s; AWS.HttpClient.streamsApiVersion = 1; app = function(req, resp) { @@ -758,14 +758,13 @@ describe('AWS.Request', function() { s = request.createReadStream(); s.on('error', function(e) { error = e; - expect(error).to.be["null"]; + expect(error).to.not.be["null"]; }); s.on('data', function(c) { return data += c.toString(); }); return s.on('end', function() { - expect(error).to.be["null"]; - expect(data).to.equal('FOOBARBAZQU'); + expect(error).to.not.be["null"]; done(); }); }); @@ -875,7 +874,7 @@ describe('AWS.Request', function() { }); }); - it('only accepts data up to the specified content-length', function(done) { + it('errors if data received exceeds the content-length (streams2)', function(done) { var request, s; if (AWS.HttpClient.streamsApiVersion < 2) { done(); @@ -891,11 +890,10 @@ describe('AWS.Request', function() { s = request.createReadStream(); s.on('error', function(e) { error = e; - expect(error).to.be["null"]; + expect(error).to.not.be["null"]; }); s.on('end', function() { - expect(error).to.be["null"]; - expect(data).to.equal('FOOBARBAZQU'); + expect(error).to.not.be["null"]; done(); }); return s.on('readable', function() {