-
Notifications
You must be signed in to change notification settings - Fork 4
/
plugs.js
163 lines (149 loc) · 4.51 KB
/
plugs.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import { basename, dirname, globToRegExp, readLines, walk } from './deps.ts'
import './plugs/builtin/validate/validate-sub.js'
import './plugs/builtin/validate/validate-event.js'
import './plugs/builtin/limits/limits.js'
import './plugs/builtin/stats/stats.js'
import CONFIG from './conf.js'
const plugs = {
connect: [],
disconnect: [],
sub: [],
unsub: [],
eose: [],
event: [],
notice: [],
error: [],
}
async function getIgnorePatterns() {
const patterns = []
for await (const p of walk(CONFIG.plugs.dir, { match: ['\.plugsignore'] })) {
const fileReader = await Deno.open(p.path)
for await (const line of readLines(fileReader)) {
if (line.startsWith('#')) continue
let dir = dirname(p.path)
if (!line.startsWith('/')) {
dir += '/**/'
}
patterns.push(globToRegExp(dir + line))
}
}
return patterns
}
// when we compile, the plugs dir won't be present, so specify them statically
function getBuiltins() {
const use = CONFIG.plugs.builtin.use
const builtins = []
if (use.includes('validate')) {
builtins.push('./plugs/builtin/validate/validate-sub.js')
builtins.push('./plugs/builtin/validate/validate-event.js')
}
if (use.includes('limits')) {
builtins.push('./plugs/builtin/limits/limits.js')
}
if (use.includes('stats')) {
builtins.push('./plugs/builtin/stats/stats.js')
}
return new Set(builtins.map((p) => new URL(p, import.meta.url).href))
}
export async function plugsInit() {
const builtins = getBuiltins()
try {
const ignorePatterns = await getIgnorePatterns()
for await (
const p of walk(CONFIG.plugs.dir, {
exts: ['.js', '.ts'],
skip: ignorePatterns,
})
) {
const href = new URL(p.path, import.meta.url).href
// start the worker and ask it which events it wants to listen to
const worker = new Worker(href, {
type: 'module',
name: basename(p.path, '.js') || basename(p.path, '.ts'),
})
await plugInit(worker, p.path)
}
} catch (e) {
if (
!(e instanceof Deno.errors.NotFound ||
e.cause instanceof Deno.errors.NotFound)
) {
throw e
}
}
for (const builtin of builtins) {
const worker = new Worker(builtin, {
type: 'module',
name: basename(builtin, '.js') ||
basename(builtin, '.ts'),
})
await plugInit(worker, basename(builtin, '.js') + ' (builtin)')
}
}
// provide async req<->resp messaging to plugs
// every message sent to a plug expecting a response gets a unique msgId
// plugs include the msgId in their response
const msgPromises = new Map() // msgId -> { resolve, reject }
let msgIdSeq = 0
function nextMsgId() {
msgIdSeq = msgIdSeq === Number.MAX_SAFE_INTEGER ? 0 : msgIdSeq + 1
return msgIdSeq
}
export async function plugsAction(action, conn, data) {
// if the action isn't something that can be rejected, resolve immediately
if (['eose', 'disconnect', 'error', 'notice', 'unsub'].includes(action)) {
plugs[action].forEach((worker) =>
worker.postMessage({ action, conn, data })
)
return
}
await Promise.all(
plugs[action].map((worker) =>
new Promise((resolve, reject) => {
const msgId = nextMsgId()
msgPromises.set(msgId, { resolve, reject })
worker.postMessage({ msgId, action, conn, data })
})
),
)
}
async function plugInit(worker, name) {
// register the plug
await new Promise((resolve, reject) => {
setTimeout(() =>
reject(
new Error(
`plug ${name} did not respond to 'getactions' within 5s. Is it a web worker?`,
),
), 5000)
worker.onmessage = ({ data }) => {
for (const action of data) {
if (Object.keys(plugs).includes(action.toLowerCase())) {
plugs[action.toLowerCase()].push(worker)
} else {
console.error(`plug ${name} registered for unknown action ${action}`)
Deno.exit(1)
}
}
console.log(`plug ${name} registered for actions: ${data.join(', ')}`)
resolve()
}
worker.onerror = reject
worker.postMessage('getactions')
})
// listen for messages from the plug
worker.onmessage = ({ data }) => {
if (data.msgId && msgPromises.has(data.msgId)) {
const { resolve, reject } = msgPromises.get(data.msgId)
if (data.accept) {
resolve()
} else {
reject(new Error(data.reason))
}
msgPromises.delete(data.msgId)
} else {
console.log(`plug ${name} unexpectedly emitted data: ${data}`)
}
}
worker.onerror = console.error
}