-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathsse_stream.rb
372 lines (315 loc) · 11.1 KB
/
sse_stream.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
require_relative 'sse_client'
Plugin.create(:worldon) do
pm = Plugin::Worldon
# ストリーム開始&直近取得イベント
defevent :worldon_start_stream, prototype: [String, String, String, pm::World, Integer]
def datasource_used?(slug, include_all = false)
return false if UserConfig[:extract_tabs].nil?
UserConfig[:extract_tabs].any? do |setting|
setting[:sources].any? do |ds|
ds == slug || include_all && ds == :worldon_appear_toots
end
end
end
on_worldon_start_stream do |domain, type, slug, world, list_id|
next if !UserConfig[:worldon_enable_streaming]
Thread.new {
sleep(rand(10))
token = nil
if world.is_a? pm::World
token = world.access_token
end
base_url = 'https://' + domain + '/api/v1/streaming/'
params = {}
case type
when 'user'
uri = Diva::URI.new(base_url + 'user')
when 'public'
uri = Diva::URI.new(base_url + 'public')
when 'public:media'
uri = Diva::URI.new(base_url + 'public')
params[:only_media] = true
when 'public:local'
uri = Diva::URI.new(base_url + 'public/local')
when 'public:local:media'
uri = Diva::URI.new(base_url + 'public/local')
params[:only_media] = true
when 'list'
uri = Diva::URI.new(base_url + 'list')
params[:list] = list_id
when 'direct'
uri = Diva::URI.new(base_url + 'direct')
end
headers = {}
if token
headers["Authorization"] = "Bearer " + token
end
Plugin.call(:sse_create, slug, :get, uri, headers, params, domain: domain, type: type, token: token)
}
end
on_worldon_stop_stream do |slug|
Plugin.call(:sse_kill_connection, slug)
end
# mikutterにとって自明に60秒以上過去となる任意の日時
@last_all_restarted = Time.new(2007, 8, 31, 0, 0, 0, "+09:00")
@waiting = false
restarter = Proc.new do
if @waiting
Plugin.call(:sse_kill_all, :worldon_start_all_streams)
atomic {
@last_all_restarted = Time.new
@waiting = false
}
end
atomic {
@waiting = false
}
Reserver.new(60, &restarter)
end
on_worldon_restart_all_streams do
now = Time.new
atomic {
@waiting = true
}
if (now - @last_all_restarted) >= 60
restarter.call
end
end
on_worldon_start_all_streams do
worlds, = Plugin.filtering(:worldon_worlds, nil)
worlds.each do |world|
Thread.new {
world.update_mutes!
Plugin.call(:worldon_init_auth_stream, world)
}
end
UserConfig[:worldon_instances].map do |domain, setting|
Plugin.call(:worldon_init_instance_stream, domain)
end
end
# インスタンスストリームを必要に応じて再起動
on_worldon_restart_instance_stream do |domain, retrieve = true|
Thread.new {
instance = pm::Instance.load(domain)
if instance.retrieve != retrieve
instance.retrieve = retrieve
instance.store
end
Plugin.call(:worldon_remove_instance_stream, domain)
if retrieve
Plugin.call(:worldon_init_instance_stream, domain)
end
}
end
on_worldon_init_instance_stream do |domain|
Thread.new {
instance = pm::Instance.load(domain)
pm::Instance.add_datasources(domain)
ftl_slug = pm::Instance.datasource_slug(domain, :federated)
ftl_media_slug = pm::Instance.datasource_slug(domain, :federated_media)
ltl_slug = pm::Instance.datasource_slug(domain, :local)
ltl_media_slug = pm::Instance.datasource_slug(domain, :local_media)
# ストリーム開始
Plugin.call(:worldon_start_stream, domain, 'public', ftl_slug) if datasource_used?(ftl_slug, true)
Plugin.call(:worldon_start_stream, domain, 'public:media', ftl_media_slug) if datasource_used?(ftl_media_slug)
Plugin.call(:worldon_start_stream, domain, 'public:local', ltl_slug) if datasource_used?(ltl_slug)
Plugin.call(:worldon_start_stream, domain, 'public:local:media', ltl_media_slug) if datasource_used?(ltl_media_slug)
}
end
on_worldon_remove_instance_stream do |domain|
Plugin.call(:worldon_stop_stream, pm::Instance.datasource_slug(domain, :federated))
Plugin.call(:worldon_stop_stream, pm::Instance.datasource_slug(domain, :local))
pm::Instance.remove_datasources(domain)
end
on_worldon_init_auth_stream do |world|
Thread.new {
lists = world.get_lists!
filter_extract_datasources do |dss|
instance = pm::Instance.load(world.domain)
datasources = {
world.datasource_slug(:home) => "Mastodonホームタイムライン(Worldon)/#{world.account.acct}",
world.datasource_slug(:direct) => "Mastodon DM(Worldon)/#{world.account.acct}",
}
lists.to_a.each do |l|
slug = world.datasource_slug(:list, l[:id])
datasources[slug] = "Mastodonリスト(Worldon)/#{world.account.acct}/#{l[:title]}"
end
[datasources.merge(dss)]
end
# ストリーム開始
if datasource_used?(world.datasource_slug(:home), true)
Plugin.call(:worldon_start_stream, world.domain, 'user', world.datasource_slug(:home), world)
end
if datasource_used?(world.datasource_slug(:direct), true)
Plugin.call(:worldon_start_stream, world.domain, 'direct', world.datasource_slug(:direct), world)
end
lists.to_a.each do |l|
id = l[:id].to_i
slug = world.datasource_slug(:list, id)
if datasource_used?(world.datasource_slug(:list, id))
Plugin.call(:worldon_start_stream, world.domain, 'list', world.datasource_slug(:list, id), world, id)
end
end
}
end
on_worldon_remove_auth_stream do |world|
slugs = []
slugs.push world.datasource_slug(:home)
slugs.push world.datasource_slug(:direct)
lists = world.get_lists!
lists.to_a.each do |l|
id = l[:id].to_i
slugs.push world.datasource_slug(:list, id)
end
slugs.each do |slug|
Plugin.call(:worldon_stop_stream, slug)
end
filter_extract_datasources do |datasources|
slugs.each do |slug|
datasources.delete slug
end
[datasources]
end
end
on_worldon_restart_sse_stream do |slug|
Thread.new {
connection, = Plugin.filtering(:sse_connection, slug)
if connection.nil?
# 終了済み
next
end
Plugin.call(:sse_kill_connection, slug)
sleep(rand(3..10))
Plugin.call(:sse_create, slug, :get, connection[:uri], connection[:headers], connection[:params], connection[:opts])
}
end
on_sse_connection_opening do |slug|
notice "SSE: connection open for #{slug.to_s}"
end
on_sse_connection_failure do |slug, response|
error "SSE: connection failure for #{slug.to_s}"
pp response if Mopt.error_level >= 1
$stdout.flush
if (response.status / 100) == 4
# 4xx系レスポンスはリトライせず終了する
Plugin.call(:sse_kill_connection, slug)
else
Plugin.call(:worldon_restart_sse_stream, slug)
end
end
on_sse_connection_closed do |slug|
warn "SSE: connection closed for #{slug.to_s}"
Plugin.call(:worldon_restart_sse_stream, slug)
end
on_sse_connection_error do |slug, e|
error "SSE: connection error for #{slug.to_s}"
pp e if Mopt.error_level >= 1
Plugin.call(:worldon_restart_sse_stream, slug)
end
on_sse_on_update do |slug, json|
Thread.new {
data = JSON.parse(json, symbolize_names: true)
update_handler(slug, data)
}
end
on_sse_on_notification do |slug, json|
Thread.new {
data = JSON.parse(json, symbolize_names: true)
notification_handler(slug, data)
}
end
on_sse_on_delete do |slug, id|
# 消す必要ある?
# pawooは一定時間後(1分~7日後)に自動消滅するtootができる拡張をしている。
# また、手動で即座に消す人もいる。
# これは後からアクセスすることはできないがTLに流れてきたものは、
# フォローした人々には見えている、という前提があるように思う。
# だから消さないよ。
end
on_unload do
Plugin.call(:sse_kill_all)
end
def stream_world(domain, access_token)
Enumerator.new{|y|
Plugin.filtering(:worldon_worlds, nil).first
}.lazy.select{|world|
world.domain == domain && world.access_token == access_token
}.first
end
def update_handler(datasource_slug, payload)
pm = Plugin::Worldon
connection, = Plugin.filtering(:sse_connection, datasource_slug)
domain = connection[:opts][:domain]
access_token = connection[:opts][:token]
status = pm::Status.build(domain, [payload]).first
return if status.nil?
Plugin.call(:extract_receive_message, datasource_slug, [status])
world = stream_world(domain, access_token)
Plugin.call(:update, world, [status])
if (status&.reblog).is_a?(pm::Status)
Plugin.call(:retweet, [status])
world = status.to_me_world
if !world.nil?
Plugin.call(:mention, world, [status])
end
end
end
def notification_handler(datasource_slug, payload)
pm = Plugin::Worldon
connection, = Plugin.filtering(:sse_connection, datasource_slug)
domain = connection[:opts][:domain]
access_token = connection[:opts][:token]
case payload[:type]
when 'mention'
status = pm::Status.build(domain, [payload[:status]]).first
return if status.nil?
Plugin.call(:extract_receive_message, datasource_slug, [status])
world = status.to_me_world
if !world.nil?
Plugin.call(:mention, world, [status])
end
when 'reblog'
user_id = payload[:account][:id]
user_statuses = pm::API.call(:get, domain, "/api/v1/accounts/#{user_id}/statuses", access_token)
if user_statuses.nil?
error "Worldon: ブーストStatusの取得に失敗"
return
end
idx = user_statuses.value.index do |hash|
hash[:reblog] && hash[:reblog][:uri] == payload[:status][:uri]
end
if idx.nil?
error "Worldon: ブーストStatusの取得に失敗(流れ速すぎ?)"
return
end
status = pm::Status.build(domain, [user_statuses[idx]]).first
return if status.nil?
Plugin.call(:retweet, [status])
world = status.to_me_world
if world
Plugin.call(:mention, world, [status])
end
when 'favourite'
user = pm::Account.new(payload[:account])
status = pm::Status.build(domain, [payload[:status]]).first
return if status.nil?
status.favorite_accts << user.acct
world = status.from_me_world
status.set_modified(Time.now.localtime) if UserConfig[:favorited_by_anyone_age] and (UserConfig[:favorited_by_myself_age] or world.user_obj != user)
if user && status && world
Plugin.call(:favorite, world, user, status)
end
when 'follow'
user = pm::Account.new payload[:account]
world = stream_world(domain, access_token)
if !world.nil?
Plugin.call(:followers_created, world, [user])
end
else
# 未知の通知
warn 'unknown notification'
pp data if Mopt.error_level >= 2
$stdout.flush
end
end
end