-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathindex.js
120 lines (103 loc) · 3.11 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
function hasEventMatch(subscriptionList, eventName) {
return !subscriptionList || subscriptionList.some(pat => pat instanceof RegExp ? pat.test(eventName) : pat === eventName);
}
module.exports = class SSEChannel {
constructor(options) {
this.options = Object.assign({}, {
pingInterval: 3000,
maxStreamDuration: 30000,
clientRetryInterval: 1000,
startId: 1,
historySize: 100,
rewind: 0,
cors: false
}, options);
this.nextID = this.options.startId;
this.clients = new Set();
this.messages = [];
this.active = true;
if (this.options.pingInterval) {
this.pingTimer = setInterval(() => this.publish(), this.options.pingInterval);
}
}
publish(data, eventName) {
if (!this.active) throw new Error('Channel closed');
let output;
let id;
if (!data && !eventName) {
if (!this.clients.size) return; // No need to create a ping entry if there are no clients connected
output = "data: \n\n";
} else {
id = this.nextID++;
if (typeof data === "object") data = JSON.stringify(data);
data = data ? data.split(/[\r\n]+/).map(str => 'data: '+str).join('\n') : '';
output = (
"id: " + id + "\n" +
(eventName ? "event: " + eventName + "\n" : "") +
(data || "data: ") + '\n\n'
);
this.messages.push({ id, eventName, output });
}
[...this.clients].filter(c => !eventName || hasEventMatch(c.events, eventName)).forEach(c => c.res.write(output));
while (this.messages.length > this.options.historySize) {
this.messages.shift();
}
return id;
}
subscribe(req, res, events) {
if (!this.active) throw new Error('Channel closed');
const c = {req, res, events};
const headers = {
"Content-Type": "text/event-stream",
"Cache-Control": "s-maxage="+(Math.floor(this.options.maxStreamDuration/1000)-1)+", max-age=0, stale-while-revalidate=0, stale-if-error=0, no-transform",
"Connection": "keep-alive"
}
if (this.options.cors) {
headers['access-control-allow-origin'] = "*";
}
c.req.socket.setNoDelay(true);
c.res.writeHead(200, headers);
let body = "retry: " + this.options.clientRetryInterval + '\n\n';
const lastID = Number.parseInt(req.headers['last-event-id'], 10);
const rewind = (!Number.isNaN(lastID)) ? ((this.nextID-1)-lastID) : this.options.rewind;
if (rewind) {
this.messages.filter(m => hasEventMatch(c.events, m.eventName)).slice(0-rewind).forEach(m => {
body += m.output
});
}
c.res.write(body);
this.clients.add(c);
setTimeout(() => {
if (!c.res.finished) {
this.unsubscribe(c);
}
}, this.options.maxStreamDuration);
c.res.on('close', () => this.unsubscribe(c));
return c;
}
unsubscribe(c) {
c.res.end();
this.clients.delete(c);
}
close() {
this.clients.forEach(c => c.res.end());
this.clients = new Set();
this.messages = [];
if (this.pingTimer) clearInterval(this.pingTimer);
this.active = false;
}
listClients() {
const rollupByIP = {};
this.clients.forEach(c => {
const ip = c.req.connection.remoteAddress;
if (!(ip in rollupByIP)) {
rollupByIP[ip] = 0;
}
rollupByIP[ip]++;
});
return rollupByIP;
}
getSubscriberCount() {
return this.clients.size;
}
};