diff --git a/lib/_http_client.js b/lib/_http_client.js index ef58f66f6a97e5..c3aa45f406d2c7 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -93,8 +93,8 @@ const { const kClientRequestStatistics = Symbol('ClientRequestStatistics'); const dc = require('diagnostics_channel'); -const onClientRequestStartChannel = dc.channel('http.client.request.start'); -const onClientResponseFinishChannel = dc.channel('http.client.response.finish'); +const CLIENT_REQUEST_START_CHANNEL = 'http.client.request.start'; +const CLIENT_RESPONSE_FINISH_CHANNEL = 'http.client.response.finish'; const { addAbortSignal, finished } = require('stream'); @@ -374,8 +374,8 @@ ClientRequest.prototype._finish = function _finish() { }, }); } - if (onClientRequestStartChannel.hasSubscribers) { - onClientRequestStartChannel.publish({ + if (dc.hasSubscribers(CLIENT_REQUEST_START_CHANNEL)) { + dc.publish(CLIENT_REQUEST_START_CHANNEL, { request: this, }); } @@ -660,8 +660,8 @@ function parserOnIncomingClient(res, shouldKeepAlive) { }, }); } - if (onClientResponseFinishChannel.hasSubscribers) { - onClientResponseFinishChannel.publish({ + if (dc.hasSubscribers(CLIENT_RESPONSE_FINISH_CHANNEL)) { + dc.publish(CLIENT_RESPONSE_FINISH_CHANNEL, { request: req, response: res, }); diff --git a/lib/_http_server.js b/lib/_http_server.js index a2eba953cfc2ba..c803cde55214cf 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -89,8 +89,8 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => { }); const dc = require('diagnostics_channel'); -const onRequestStartChannel = dc.channel('http.server.request.start'); -const onResponseFinishChannel = dc.channel('http.server.response.finish'); +const SERVER_REQUEST_START_CHANNEL = 'http.server.request.start'; +const SERVER_RESPONSE_FINISH_CHANNEL = 'http.server.response.finish'; const kServerResponse = Symbol('ServerResponse'); const kServerResponseStatistics = Symbol('ServerResponseStatistics'); @@ -870,8 +870,8 @@ function clearIncoming(req) { } function resOnFinish(req, res, socket, state, server) { - if (onResponseFinishChannel.hasSubscribers) { - onResponseFinishChannel.publish({ + if (dc.hasSubscribers(SERVER_RESPONSE_FINISH_CHANNEL)) { + dc.publish(SERVER_RESPONSE_FINISH_CHANNEL, { request: req, response: res, socket, @@ -961,8 +961,8 @@ function parserOnIncoming(server, socket, state, req, keepAlive) { res.shouldKeepAlive = keepAlive; res[kUniqueHeaders] = server[kUniqueHeaders]; - if (onRequestStartChannel.hasSubscribers) { - onRequestStartChannel.publish({ + if (dc.hasSubscribers(SERVER_REQUEST_START_CHANNEL)) { + dc.publish(SERVER_REQUEST_START_CHANNEL, { request: req, response: res, socket, diff --git a/lib/dgram.js b/lib/dgram.js index 9a11a2287c6155..23747776abb784 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -75,7 +75,7 @@ const { } = internalBinding('udp_wrap'); const dc = require('diagnostics_channel'); -const udpSocketChannel = dc.channel('udp.socket'); +const UDP_SOCKET_CHANNEL = 'udp.socket'; const BIND_STATE_UNBOUND = 0; const BIND_STATE_BINDING = 1; @@ -148,8 +148,8 @@ function Socket(type, listener) { this.once('close', () => signal.removeEventListener('abort', onAborted)); } } - if (udpSocketChannel.hasSubscribers) { - udpSocketChannel.publish({ + if (dc.hasSubscribers(UDP_SOCKET_CHANNEL)) { + dc.publish(UDP_SOCKET_CHANNEL, { socket: this, }); } diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js index f8c1edb96dfe8a..cf270f8de0db8d 100644 --- a/lib/diagnostics_channel.js +++ b/lib/diagnostics_channel.js @@ -136,10 +136,22 @@ function hasSubscribers(name) { return channel.hasSubscribers; } +function publish(name, data) { + let channel; + const ref = channels[name]; + if (ref) channel = ref.get(); + if (!channel) { + return false; + } + channel.publish(data); + return true; +} + module.exports = { channel, + Channel, hasSubscribers, subscribe, unsubscribe, - Channel + publish, }; diff --git a/lib/net.js b/lib/net.js index 548e5e7878a56b..fe090eb2cf41d9 100644 --- a/lib/net.js +++ b/lib/net.js @@ -131,18 +131,17 @@ const noop = () => {}; const kPerfHooksNetConnectContext = Symbol('kPerfHooksNetConnectContext'); -let netClientSocketChannel; -let netServerSocketChannel; +const CLIENT_SOCKET_CHANNEL = 'net.client.socket'; +const SERVER_SOCKET_CHANNEL = 'net.server.socket'; +let dc; function lazyChannels() { // TODO(joyeecheung): support diagnostics channels in the snapshot. // For now it is fine to create them lazily when there isn't a snapshot to // build. If users need the channels they would have to create them first // before invoking any built-ins that would publish to these channels // anyway. - if (netClientSocketChannel === undefined) { - const dc = require('diagnostics_channel'); - netClientSocketChannel = dc.channel('net.client.socket'); - netServerSocketChannel = dc.channel('net.server.socket'); + if (dc === undefined) { + dc = require('diagnostics_channel'); } } @@ -218,8 +217,8 @@ function connect(...args) { debug('createConnection', normalized); const socket = new Socket(options); lazyChannels(); - if (netClientSocketChannel.hasSubscribers) { - netClientSocketChannel.publish({ + if (dc.hasSubscribers(CLIENT_SOCKET_CHANNEL)) { + dc.publish(CLIENT_SOCKET_CHANNEL, { socket, }); } @@ -1762,8 +1761,8 @@ function onconnection(err, clientHandle) { socket._server = self; self.emit('connection', socket); lazyChannels(); - if (netServerSocketChannel.hasSubscribers) { - netServerSocketChannel.publish({ + if (dc.hasSubscribers(SERVER_SOCKET_CHANNEL)) { + dc.publish(SERVER_SOCKET_CHANNEL, { socket, }); } diff --git a/test/parallel/test-diagnostics-channel-http-server-start.js b/test/parallel/test-diagnostics-channel-http-server-start.js index 9a8136d4cc5839..ad2f6ba48872e2 100644 --- a/test/parallel/test-diagnostics-channel-http-server-start.js +++ b/test/parallel/test-diagnostics-channel-http-server-start.js @@ -6,21 +6,18 @@ const dc = require('diagnostics_channel'); const assert = require('assert'); const http = require('http'); -const incomingStartChannel = dc.channel('http.server.request.start'); -const outgoingFinishChannel = dc.channel('http.server.response.finish'); - const als = new AsyncLocalStorage(); let context; // Bind requests to an AsyncLocalStorage context -incomingStartChannel.subscribe(common.mustCall((message) => { +dc.subscribe('http.server.request.start', common.mustCall((message) => { als.enterWith(message); context = message; })); // When the request ends, verify the context has been maintained // and that the messages contain the expected data -outgoingFinishChannel.subscribe(common.mustCall((message) => { +dc.subscribe('http.server.response.finish', common.mustCall((message) => { const data = { request, response, diff --git a/test/parallel/test-diagnostics-channel-http.js b/test/parallel/test-diagnostics-channel-http.js index 4de08d9253c10a..c2e84444e2866e 100644 --- a/test/parallel/test-diagnostics-channel-http.js +++ b/test/parallel/test-diagnostics-channel-http.js @@ -5,26 +5,24 @@ const http = require('http'); const net = require('net'); const dc = require('diagnostics_channel'); -const onClientRequestStart = dc.channel('http.client.request.start'); -const onClientResponseFinish = dc.channel('http.client.response.finish'); -const onServerRequestStart = dc.channel('http.server.request.start'); -const onServerResponseFinish = dc.channel('http.server.response.finish'); - const isHTTPServer = (server) => server instanceof http.Server; const isIncomingMessage = (object) => object instanceof http.IncomingMessage; const isOutgoingMessage = (object) => object instanceof http.OutgoingMessage; const isNetSocket = (socket) => socket instanceof net.Socket; -onClientRequestStart.subscribe(common.mustCall(({ request }) => { +dc.subscribe('http.client.request.start', common.mustCall(({ request }) => { assert.strictEqual(isOutgoingMessage(request), true); })); -onClientResponseFinish.subscribe(common.mustCall(({ request, response }) => { +dc.subscribe('http.client.response.finish', common.mustCall(({ + request, + response +}) => { assert.strictEqual(isOutgoingMessage(request), true); assert.strictEqual(isIncomingMessage(response), true); })); -onServerRequestStart.subscribe(common.mustCall(({ +dc.subscribe('http.server.request.start', common.mustCall(({ request, response, socket, @@ -36,7 +34,7 @@ onServerRequestStart.subscribe(common.mustCall(({ assert.strictEqual(isHTTPServer(server), true); })); -onServerResponseFinish.subscribe(common.mustCall(({ +dc.subscribe('http.server.response.finish', common.mustCall(({ request, response, socket,