-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathheyfil.go
169 lines (152 loc) · 3.73 KB
/
heyfil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package main
import (
"context"
"net/http"
"sync"
"time"
"github.com/ipfs/go-log/v2"
"github.com/ybbus/jsonrpc/v3"
)
var logger = log.Logger("heyfil")
type (
heyFil struct {
*options
c jsonrpc.RPCClient
targetsMutex sync.RWMutex
targets map[string]*Target
recentPiecesMutex sync.RWMutex
recentPieces recentPieces
toCheck chan *Target
checked chan *Target
metrics metrics
dealStats *dealStats
metricsServer http.Server
apiServer http.Server
}
)
func newHeyFil(o ...Option) (*heyFil, error) {
opts, err := newOptions(o...)
if err != nil {
return nil, err
}
c := jsonrpc.NewClient(opts.api)
if opts.apiToken != "" {
c = jsonrpc.NewClientWithOpts(opts.api, &jsonrpc.RPCClientOpts{CustomHeaders: map[string]string{"Authorization": "Bearer " + opts.apiToken}})
}
hf := &heyFil{
options: opts,
c: c,
targets: make(map[string]*Target),
toCheck: make(chan *Target, 100),
checked: make(chan *Target, 100),
}
hf.dealStats = &dealStats{hf: hf}
return hf, nil
}
func (hf *heyFil) Start(ctx context.Context) error {
if err := hf.loadTargets(); err != nil {
logger.Warnw("Failed to load targets; continuing operation without pre-existing data.", "err", err)
}
if err := hf.loadRecentPieces(); err != nil {
logger.Warnw("Failed to load recent pieces; continuing operation without pre-existing data.", "err", err)
}
if err := hf.metrics.start(); err != nil {
return err
}
if err := hf.startMetricsServer(); err != nil {
return err
}
if err := hf.startApiServer(); err != nil {
return err
}
hf.dealStats.start(ctx)
// start checkers.
for i := 0; i < hf.maxConcurrentParticipantCheck; i++ {
go hf.checker(ctx)
}
// Start check dispatcher.
go func() {
dispatch := func(ctx context.Context, t time.Time) {
logger := logger.With("t", t)
mids, err := hf.stateListMiners(ctx)
if err != nil {
logger.Errorw("failed to get state market participants", "err", err)
return
}
hf.metrics.notifyParticipantCount(int64(len(mids)))
logger.Infow("fetched state market participants", "count", len(mids))
for _, mid := range mids {
select {
case <-ctx.Done():
return
case hf.toCheck <- hf.newTarget(mid):
}
}
}
dispatch(ctx, time.Now())
for {
select {
case <-ctx.Done():
return
case t := <-hf.participantsCheckInterval.C:
dispatch(ctx, t)
}
}
}()
// Start checked result handler
go func() {
snapshot := func(ctx context.Context, t time.Time) {
logger := logger.With("t", t)
hf.targetsMutex.RLock()
hf.metrics.snapshot(hf.targets)
l := len(hf.targets)
hf.targetsMutex.RUnlock()
logger.Debugw("reported check results", "miner-count", l)
}
snapshot(ctx, time.Now())
for {
select {
case <-ctx.Done():
return
case target := <-hf.checked:
hf.targetsMutex.Lock()
hf.targets[target.ID] = target
if err := hf.store(target); err != nil {
logger.Warnw("Failed to store target information", "id", target.ID, "err", err)
}
hf.targetsMutex.Unlock()
case t := <-hf.snapshotInterval.C:
snapshot(ctx, t)
}
}
}()
logger.Info("heyfil started")
return nil
}
func (hf *heyFil) newTarget(mid string) *Target {
return &Target{ID: mid, hf: hf}
}
func (hf *heyFil) checker(ctx context.Context) {
for target := range hf.toCheck {
logger := logger.With("id", target.ID)
logger.Debug("checking target")
select {
case <-ctx.Done():
return
case hf.checked <- target.check(ctx):
logger.Debug("stored check result")
}
}
}
func (hf *heyFil) Shutdown(ctx context.Context) error {
merr := hf.metrics.shutdown(ctx)
mserr := hf.shutdownMetricsServer(ctx)
aserr := hf.shutdownApiServer(ctx)
if merr != nil {
return merr
}
if mserr != nil {
return mserr
}
return aserr
}