Skip to content

Commit

Permalink
lib: improved diagnostics_channel subscribe/unsubscribe
Browse files Browse the repository at this point in the history
Adds a new top-level subscribe/unsubscribe which will ref/unref the
channel WeakReference to prevent subscriptions from getting garbage
collected.

PR-URL: #42714
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Chengzhong Wu <[email protected]>
Reviewed-By: Gerhard Stöbich <[email protected]>
Reviewed-By: Rafael Gonzaga <[email protected]>
Reviewed-By: Vladimir de Turckheim <[email protected]>
  • Loading branch information
Stephen Belanger authored and danielleadams committed Jul 26, 2022
1 parent 2ec8092 commit 324473c
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 7 deletions.
95 changes: 88 additions & 7 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ import diagnostics_channel from 'node:diagnostics_channel';
// Get a reusable channel object
const channel = diagnostics_channel.channel('my-channel');

// Subscribe to the channel
channel.subscribe((message, name) => {
function onMessage(message, name) {
// Received data
});
}

// Subscribe to the channel
diagnostics_channel.subscribe('my-channel', onMessage);

// Check if the channel has an active subscriber
if (channel.hasSubscribers) {
Expand All @@ -55,6 +57,9 @@ if (channel.hasSubscribers) {
some: 'data'
});
}

// Unsubscribe from the channel
diagnostics_channel.unsubscribe('my-channel', onMessage);
```

```cjs
Expand All @@ -63,10 +68,12 @@ const diagnostics_channel = require('node:diagnostics_channel');
// Get a reusable channel object
const channel = diagnostics_channel.channel('my-channel');

// Subscribe to the channel
channel.subscribe((message, name) => {
function onMessage(message, name) {
// Received data
});
}

// Subscribe to the channel
diagnostics_channel.subscribe('my-channel', onMessage);

// Check if the channel has an active subscriber
if (channel.hasSubscribers) {
Expand All @@ -75,6 +82,9 @@ if (channel.hasSubscribers) {
some: 'data'
});
}

// Unsubscribe from the channel
diagnostics_channel.unsubscribe('my-channel', onMessage);
```

#### `diagnostics_channel.hasSubscribers(name)`
Expand Down Expand Up @@ -121,7 +131,7 @@ added:
* `name` {string|symbol} The channel name
* Returns: {Channel} The named channel object

This is the primary entry-point for anyone wanting to interact with a named
This is the primary entry-point for anyone wanting to publish to a named
channel. It produces a channel object which is optimized to reduce overhead at
publish time as much as possible.

Expand All @@ -137,6 +147,76 @@ const diagnostics_channel = require('node:diagnostics_channel');
const channel = diagnostics_channel.channel('my-channel');
```

#### `diagnostics_channel.subscribe(name, onMessage)`

<!-- YAML
added:
- REPLACEME
-->

* `name` {string|symbol} The channel name
* `onMessage` {Function} The handler to receive channel messages
* `message` {any} The message data
* `name` {string|symbol} The name of the channel

Register a message handler to subscribe to this channel. This message handler
will be run synchronously whenever a message is published to the channel. Any
errors thrown in the message handler will trigger an [`'uncaughtException'`][].

```mjs
import diagnostics_channel from 'diagnostics_channel';

diagnostics_channel.subscribe('my-channel', (message, name) => {
// Received data
});
```

```cjs
const diagnostics_channel = require('diagnostics_channel');

diagnostics_channel.subscribe('my-channel', (message, name) => {
// Received data
});
```

#### `diagnostics_channel.unsubscribe(name, onMessage)`

<!-- YAML
added:
- REPLACEME
-->

* `name` {string|symbol} The channel name
* `onMessage` {Function} The previous subscribed handler to remove
* Returns: {boolean} `true` if the handler was found, `false` otherwise.

Remove a message handler previously registered to this channel with
[`diagnostics_channel.subscribe(name, onMessage)`][].

```mjs
import diagnostics_channel from 'diagnostics_channel';

function onMessage(message, name) {
// Received data
}

diagnostics_channel.subscribe('my-channel', onMessage);

diagnostics_channel.unsubscribe('my-channel', onMessage);
```

```cjs
const diagnostics_channel = require('diagnostics_channel');

function onMessage(message, name) {
// Received data
}

diagnostics_channel.subscribe('my-channel', onMessage);

diagnostics_channel.unsubscribe('my-channel', onMessage);
```

### Class: `Channel`

<!-- YAML
Expand Down Expand Up @@ -344,4 +424,5 @@ Emitted when server sends a response.

[`'uncaughtException'`]: process.md#event-uncaughtexception
[`channel.subscribe(onMessage)`]: #channelsubscribeonmessage
[`diagnostics_channel.subscribe(name, onMessage)`]: #diagnostics_channelunsubscribename_onmessage
[`diagnostics_channel.channel(name)`]: #diagnostics_channelchannelname
18 changes: 18 additions & 0 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,22 @@ function channel(name) {
return channel;
}

function subscribe(name, subscription) {
const chan = channel(name);
channels[name].incRef();
chan.subscribe(subscription);
}

function unsubscribe(name, subscription) {
const chan = channel(name);
if (!chan.unsubscribe(subscription)) {
return false;
}

channels[name].decRef();
return true;
}

function hasSubscribers(name) {
let channel;
const ref = channels[name];
Expand All @@ -123,5 +139,7 @@ function hasSubscribers(name) {
module.exports = {
channel,
hasSubscribers,
subscribe,
unsubscribe,
Channel
};
44 changes: 44 additions & 0 deletions test/parallel/test-diagnostics-channel-pub-sub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');
const { Channel } = dc;

const name = 'test';
const input = {
foo: 'bar'
};

// Individual channel objects can be created to avoid future lookups
const channel = dc.channel(name);
assert.ok(channel instanceof Channel);

// No subscribers yet, should not publish
assert.ok(!channel.hasSubscribers);

const subscriber = common.mustCall((message, name) => {
assert.strictEqual(name, channel.name);
assert.deepStrictEqual(message, input);
});

// Now there's a subscriber, should publish
dc.subscribe(name, subscriber);
assert.ok(channel.hasSubscribers);

// The ActiveChannel prototype swap should not fail instanceof
assert.ok(channel instanceof Channel);

// Should trigger the subscriber once
channel.publish(input);

// Should not publish after subscriber is unsubscribed
assert.ok(dc.unsubscribe(name, subscriber));
assert.ok(!channel.hasSubscribers);

// unsubscribe() should return false when subscriber is not found
assert.ok(!dc.unsubscribe(name, subscriber));

assert.throws(() => {
dc.subscribe(name, null);
}, { code: 'ERR_INVALID_ARG_TYPE' });

0 comments on commit 324473c

Please sign in to comment.