Skip to content

Commit

Permalink
diagnostics_channel: fix diagnostics channel
Browse files Browse the repository at this point in the history
  • Loading branch information
theanarkh committed Aug 6, 2022
1 parent 90c758c commit 4028642
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 40 deletions.
12 changes: 6 additions & 6 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ const {
const kClientRequestStatistics = Symbol('ClientRequestStatistics');

const dc = require('diagnostics_channel');
const onClientRequestStartChannel = dc.channel('http.client.request.start');
const onClientResponseFinishChannel = dc.channel('http.client.response.finish');
const CLIENT_REQUEST_START_CHANNEL = 'http.client.request.start';
const CLIENT_RESPONSE_FINISH_CHANNEL = 'http.client.response.finish';

const { addAbortSignal, finished } = require('stream');

Expand Down Expand Up @@ -374,8 +374,8 @@ ClientRequest.prototype._finish = function _finish() {
},
});
}
if (onClientRequestStartChannel.hasSubscribers) {
onClientRequestStartChannel.publish({
if (dc.hasSubscribers(CLIENT_REQUEST_START_CHANNEL)) {
dc.publish(CLIENT_REQUEST_START_CHANNEL, {
request: this,
});
}
Expand Down Expand Up @@ -660,8 +660,8 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
},
});
}
if (onClientResponseFinishChannel.hasSubscribers) {
onClientResponseFinishChannel.publish({
if (dc.hasSubscribers(CLIENT_RESPONSE_FINISH_CHANNEL)) {
dc.publish(CLIENT_RESPONSE_FINISH_CHANNEL, {
request: req,
response: res,
});
Expand Down
12 changes: 6 additions & 6 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
});

const dc = require('diagnostics_channel');
const onRequestStartChannel = dc.channel('http.server.request.start');
const onResponseFinishChannel = dc.channel('http.server.response.finish');
const SERVER_REQUEST_START_CHANNEL = 'http.server.request.start';
const SERVER_RESPONSE_FINISH_CHANNEL = 'http.server.response.finish';

const kServerResponse = Symbol('ServerResponse');
const kServerResponseStatistics = Symbol('ServerResponseStatistics');
Expand Down Expand Up @@ -870,8 +870,8 @@ function clearIncoming(req) {
}

function resOnFinish(req, res, socket, state, server) {
if (onResponseFinishChannel.hasSubscribers) {
onResponseFinishChannel.publish({
if (dc.hasSubscribers(SERVER_RESPONSE_FINISH_CHANNEL)) {
dc.publish(SERVER_RESPONSE_FINISH_CHANNEL, {
request: req,
response: res,
socket,
Expand Down Expand Up @@ -961,8 +961,8 @@ function parserOnIncoming(server, socket, state, req, keepAlive) {
res.shouldKeepAlive = keepAlive;
res[kUniqueHeaders] = server[kUniqueHeaders];

if (onRequestStartChannel.hasSubscribers) {
onRequestStartChannel.publish({
if (dc.hasSubscribers(SERVER_REQUEST_START_CHANNEL)) {
dc.publish(SERVER_REQUEST_START_CHANNEL, {
request: req,
response: res,
socket,
Expand Down
6 changes: 3 additions & 3 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const {
} = internalBinding('udp_wrap');

const dc = require('diagnostics_channel');
const udpSocketChannel = dc.channel('udp.socket');
const UDP_SOCKET_CHANNEL = 'udp.socket';

const BIND_STATE_UNBOUND = 0;
const BIND_STATE_BINDING = 1;
Expand Down Expand Up @@ -148,8 +148,8 @@ function Socket(type, listener) {
this.once('close', () => signal.removeEventListener('abort', onAborted));
}
}
if (udpSocketChannel.hasSubscribers) {
udpSocketChannel.publish({
if (dc.hasSubscribers(UDP_SOCKET_CHANNEL)) {
dc.publish(UDP_SOCKET_CHANNEL, {
socket: this,
});
}
Expand Down
14 changes: 13 additions & 1 deletion lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,22 @@ function hasSubscribers(name) {
return channel.hasSubscribers;
}

function publish(name, data) {
let channel;
const ref = channels[name];
if (ref) channel = ref.get();
if (!channel) {
return false;
}
channel.publish(data);
return true;
}

module.exports = {
channel,
Channel,
hasSubscribers,
subscribe,
unsubscribe,
Channel
publish,
};
19 changes: 9 additions & 10 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,17 @@ const noop = () => {};

const kPerfHooksNetConnectContext = Symbol('kPerfHooksNetConnectContext');

let netClientSocketChannel;
let netServerSocketChannel;
const CLIENT_SOCKET_CHANNEL = 'net.client.socket';
const SERVER_SOCKET_CHANNEL = 'net.server.socket';
let dc;
function lazyChannels() {
// TODO(joyeecheung): support diagnostics channels in the snapshot.
// For now it is fine to create them lazily when there isn't a snapshot to
// build. If users need the channels they would have to create them first
// before invoking any built-ins that would publish to these channels
// anyway.
if (netClientSocketChannel === undefined) {
const dc = require('diagnostics_channel');
netClientSocketChannel = dc.channel('net.client.socket');
netServerSocketChannel = dc.channel('net.server.socket');
if (dc === undefined) {
dc = require('diagnostics_channel');
}
}

Expand Down Expand Up @@ -218,8 +217,8 @@ function connect(...args) {
debug('createConnection', normalized);
const socket = new Socket(options);
lazyChannels();
if (netClientSocketChannel.hasSubscribers) {
netClientSocketChannel.publish({
if (dc.hasSubscribers(CLIENT_SOCKET_CHANNEL)) {
dc.publish(CLIENT_SOCKET_CHANNEL, {
socket,
});
}
Expand Down Expand Up @@ -1762,8 +1761,8 @@ function onconnection(err, clientHandle) {
socket._server = self;
self.emit('connection', socket);
lazyChannels();
if (netServerSocketChannel.hasSubscribers) {
netServerSocketChannel.publish({
if (dc.hasSubscribers(SERVER_SOCKET_CHANNEL)) {
dc.publish(SERVER_SOCKET_CHANNEL, {
socket,
});
}
Expand Down
7 changes: 2 additions & 5 deletions test/parallel/test-diagnostics-channel-http-server-start.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,18 @@ const dc = require('diagnostics_channel');
const assert = require('assert');
const http = require('http');

const incomingStartChannel = dc.channel('http.server.request.start');
const outgoingFinishChannel = dc.channel('http.server.response.finish');

const als = new AsyncLocalStorage();
let context;

// Bind requests to an AsyncLocalStorage context
incomingStartChannel.subscribe(common.mustCall((message) => {
dc.subscribe('http.server.request.start', common.mustCall((message) => {
als.enterWith(message);
context = message;
}));

// When the request ends, verify the context has been maintained
// and that the messages contain the expected data
outgoingFinishChannel.subscribe(common.mustCall((message) => {
dc.subscribe('http.server.response.finish', common.mustCall((message) => {
const data = {
request,
response,
Expand Down
16 changes: 7 additions & 9 deletions test/parallel/test-diagnostics-channel-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,24 @@ const http = require('http');
const net = require('net');
const dc = require('diagnostics_channel');

const onClientRequestStart = dc.channel('http.client.request.start');
const onClientResponseFinish = dc.channel('http.client.response.finish');
const onServerRequestStart = dc.channel('http.server.request.start');
const onServerResponseFinish = dc.channel('http.server.response.finish');

const isHTTPServer = (server) => server instanceof http.Server;
const isIncomingMessage = (object) => object instanceof http.IncomingMessage;
const isOutgoingMessage = (object) => object instanceof http.OutgoingMessage;
const isNetSocket = (socket) => socket instanceof net.Socket;

onClientRequestStart.subscribe(common.mustCall(({ request }) => {
dc.subscribe('http.client.request.start', common.mustCall(({ request }) => {
assert.strictEqual(isOutgoingMessage(request), true);
}));

onClientResponseFinish.subscribe(common.mustCall(({ request, response }) => {
dc.subscribe('http.client.response.finish', common.mustCall(({
request,
response
}) => {
assert.strictEqual(isOutgoingMessage(request), true);
assert.strictEqual(isIncomingMessage(response), true);
}));

onServerRequestStart.subscribe(common.mustCall(({
dc.subscribe('http.server.request.start', common.mustCall(({
request,
response,
socket,
Expand All @@ -36,7 +34,7 @@ onServerRequestStart.subscribe(common.mustCall(({
assert.strictEqual(isHTTPServer(server), true);
}));

onServerResponseFinish.subscribe(common.mustCall(({
dc.subscribe('http.server.response.finish', common.mustCall(({
request,
response,
socket,
Expand Down

0 comments on commit 4028642

Please sign in to comment.