Skip to content

Commit

Permalink
feat: add option to allow usage of custom parser (#471)
Browse files Browse the repository at this point in the history
This PR adds a new parser option to the adapter constructor to allow
setting a custom parser to use, defaulting to using notepack.io. This
would allow someone to use a different msgpack library if they wanted,
or even an entirely different protocol altogether (e.g. protobuf).

Related:

- #462
- #469
  • Loading branch information
MasterOdin authored and darrachequesne committed Dec 7, 2022
1 parent d2977a3 commit 73f6320
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 6 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ The following options are allowed:

- `key`: the name of the key to pub/sub events on as prefix (`socket.io`)
- `requestsTimeout`: optional, after this timeout the adapter will stop waiting from responses to request (`5000ms`)
- `parser`: optional, parser to use for encoding and decoding messages passed through Redis ([`notepack.io`](https://www.npmjs.com/package/notepack.io))

### RedisAdapter

Expand All @@ -205,6 +206,7 @@ that a regular `Adapter` does not
- `pubClient`
- `subClient`
- `requestsTimeout`
- `parser`

### RedisAdapter#sockets(rooms: Set<String>)

Expand Down
24 changes: 18 additions & 6 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ interface AckRequest {
ack: (...args: any[]) => void;
}

interface Parser {
decode: (msg: any) => any;
encode: (msg: any) => any;
}

const isNumeric = (str) => !isNaN(str) && !isNaN(parseFloat(str));

export interface RedisAdapterOptions {
Expand All @@ -62,6 +67,11 @@ export interface RedisAdapterOptions {
* @default false
*/
publishOnSpecificResponseChannel: boolean;
/**
* The parser to use for encoding and decoding messages sent to Redis.
* This option defaults to using `notepack.io`, a MessagePack implementation.
*/
parser: Parser;
}

/**
Expand All @@ -87,6 +97,7 @@ export class RedisAdapter extends Adapter {
public readonly uid;
public readonly requestsTimeout: number;
public readonly publishOnSpecificResponseChannel: boolean;
public readonly parser: Parser;

private readonly channel: string;
private readonly requestChannel: string;
Expand Down Expand Up @@ -115,6 +126,7 @@ export class RedisAdapter extends Adapter {
this.uid = uid2(6);
this.requestsTimeout = opts.requestsTimeout || 5000;
this.publishOnSpecificResponseChannel = !!opts.publishOnSpecificResponseChannel;
this.parser = opts.parser || msgpack;

const prefix = opts.key || "socket.io";

Expand Down Expand Up @@ -181,7 +193,7 @@ export class RedisAdapter extends Adapter {
return debug("ignore unknown room %s", room);
}

const args = msgpack.decode(msg);
const args = this.parser.decode(msg);

const [uid, packet, opts] = args;
if (this.uid === uid) return debug("ignore same uid");
Expand Down Expand Up @@ -226,7 +238,7 @@ export class RedisAdapter extends Adapter {
if (msg[0] === 0x7b) {
request = JSON.parse(msg.toString());
} else {
request = msgpack.decode(msg);
request = this.parser.decode(msg);
}
} catch (err) {
debug("ignoring malformed request");
Expand Down Expand Up @@ -424,7 +436,7 @@ export class RedisAdapter extends Adapter {

this.publishResponse(
request,
msgpack.encode({
this.parser.encode({
type: RequestType.BROADCAST_ACK,
requestId: request.requestId,
packet: arg,
Expand Down Expand Up @@ -467,7 +479,7 @@ export class RedisAdapter extends Adapter {
if (msg[0] === 0x7b) {
response = JSON.parse(msg.toString());
} else {
response = msgpack.decode(msg);
response = this.parser.decode(msg);
}
} catch (err) {
debug("ignoring malformed response");
Expand Down Expand Up @@ -596,7 +608,7 @@ export class RedisAdapter extends Adapter {
except: [...new Set(opts.except)],
flags: opts.flags,
};
const msg = msgpack.encode([this.uid, packet, rawOpts]);
const msg = this.parser.encode([this.uid, packet, rawOpts]);
let channel = this.channel;
if (opts.rooms && opts.rooms.size === 1) {
channel += opts.rooms.keys().next().value + "#";
Expand Down Expand Up @@ -626,7 +638,7 @@ export class RedisAdapter extends Adapter {
flags: opts.flags,
};

const request = msgpack.encode({
const request = this.parser.encode({
uid: this.uid,
requestId,
type: RequestType.BROADCAST,
Expand Down
85 changes: 85 additions & 0 deletions test/custom-parser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { createServer } from "http";
import { Server, Socket as ServerSocket } from "socket.io";
import { io as ioc, Socket as ClientSocket } from "socket.io-client";
import { createAdapter } from "../lib";
import { createClient } from "redis";
import { AddressInfo } from "net";
import { times } from "./util";
import expect = require("expect.js");

const NODES_COUNT = 3;

describe("custom parser", () => {
let servers: Server[] = [],
serverSockets: ServerSocket[] = [],
clientSockets: ClientSocket[] = [],
redisClients: any[] = [];

beforeEach(async () => {
for (let i = 1; i <= NODES_COUNT; i++) {
const httpServer = createServer();
const pubClient = createClient();
const subClient = createClient();

await Promise.all([pubClient.connect(), subClient.connect()]);

redisClients.push(pubClient, subClient);

const io = new Server(httpServer, {
adapter: createAdapter(pubClient, subClient, {
parser: {
decode(msg) {
return JSON.parse(msg);
},
encode(msg) {
return JSON.stringify(msg);
},
},
}),
});

await new Promise((resolve) => {
httpServer.listen(() => {
const port = (httpServer.address() as AddressInfo).port;
const clientSocket = ioc(`http://localhost:${port}`);

io.on("connection", async (socket) => {
clientSockets.push(clientSocket);
serverSockets.push(socket);
servers.push(io);
resolve();
});
});
});
}
});

afterEach(() => {
servers.forEach((server) => {
// @ts-ignore
server.httpServer.close();
server.of("/").adapter.close();
});
clientSockets.forEach((socket) => {
socket.disconnect();
});
redisClients.forEach((redisClient) => {
redisClient.disconnect();
});
});

it("broadcasts", (done) => {
const partialDone = times(3, done);

clientSockets.forEach((clientSocket) => {
clientSocket.on("test", (arg1, arg2, arg3) => {
expect(arg1).to.eql(1);
expect(arg2).to.eql("2");
expect(arg3).to.eql([3]);
partialDone();
});
});

servers[0].emit("test", 1, "2", [3]);
});
});
2 changes: 2 additions & 0 deletions test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,5 @@ function cleanup(done) {
}
done();
}

require("./custom-parser");
10 changes: 10 additions & 0 deletions test/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,13 @@ Assertion.prototype.contain = function (...args) {
}
return contain.apply(this, args);
};

export function times(count: number, fn: () => void) {
let i = 0;
return () => {
i++;
if (i === count) {
fn();
}
};
}

0 comments on commit 73f6320

Please sign in to comment.