-
Notifications
You must be signed in to change notification settings - Fork 10
/
extThread.ml
175 lines (144 loc) · 4.31 KB
/
extThread.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
let log = Log.self
type 'a t = [ `Exn of exn | `None | `Ok of 'a ] ref * Thread.t
let detach f x =
let result = ref `None in
result, Thread.create (fun () -> result := Exn.map f x) ()
let join (result,thread) = Thread.join thread; match !result with `None -> assert false | (`Ok _ | `Exn _ as x) -> x
let join_exn t = match join t with `Ok x -> x | `Exn exn -> raise exn
let map f a = Array.map join_exn @@ Array.map (detach f) a
let mapn ?(n=8) f l =
assert (n > 0);
Action.distribute n l |> map (List.map @@ Exn.map f) |> Action.undistribute
let locked mutex f = Mutex.lock mutex; Std.finally (fun () -> Mutex.unlock mutex) f ()
module LockMutex = struct
type t = Mutex.t
let create = Mutex.create
let locked = locked
end
module Async_fin = struct
open Async
module U = ExtUnix.All
type t = { q : (unit -> unit) Mtq.t; evfd : Unix.file_descr; }
let is_available () = ExtUnix.Config.have `EVENTFD
let setup events =
let fin = { q = Mtq.create (); evfd = U.eventfd 0; } in
let rec loop () =
match Mtq.try_get fin.q with
| None -> ()
| Some f -> begin try f () with exn -> log #warn ~exn "fin loop" end; loop ()
in
let reset fd =
try
ignore (U.eventfd_read fd)
with
| Unix.Unix_error (Unix.EAGAIN, _, _) -> ()
| exn -> log #warn ~exn "fin reset"; ()
in
setup_simple_event events fin.evfd [Ev.READ] begin fun _ fd _ -> reset fd; loop () end;
fin
let shutdown { q; evfd } = Mtq.clear q; Unix.close evfd
let callback fin f =
Mtq.put fin.q f;
U.eventfd_write fin.evfd 1L
end
let log_create ?name f x = Thread.create (fun () -> Action.log ?name f x) ()
let run_periodic ~delay ?(now=false) f =
let (_:Thread.t) = Thread.create begin fun () ->
if not now then Nix.sleep delay;
while try f () with exn -> Log.self #warn ~exn "ExtThread.run_periodic"; true do
Nix.sleep delay
done
end ()
in
()
module type WorkerT = sig
type task
type result
end
module type Workers = sig
type task
type result
type t
val create : (task -> result) -> int -> t
val perform : t -> ?autoexit:bool -> task Enum.t -> (result -> unit) -> unit
val stop : ?wait:int -> t -> unit
end
module Workers(T:WorkerT) =
struct
type task = T.task
type result = T.result
type t = task Mtq.t * result Mtq.t * int
let worker qi f qo =
while true do
Mtq.put qo (f (Mtq.get qi))
done
let stop ?wait:_ (qi,_,_) = Mtq.clear qi
let create f n =
let qi = Mtq.create () and qo = Mtq.create () in
for _ = 1 to n do
ignore (Thread.create (fun () -> worker qi f qo) ())
done;
qi,qo,n
let perform (qi,qo,n) ?autoexit:_ e f =
let active = ref 0 in
for _ = 1 to n do
match Enum.get e with
| Some x -> Mtq.put qi x; incr active
| None -> ()
done;
while !active > 0 do
let res = Mtq.get qo in
begin match Enum.get e with
| Some x -> Mtq.put qi x
| None -> decr active
end;
f res
done
end
let atomic_incr = incr
let atomic_decr = decr
let atomic_get x = !x
module Pool = struct
type t = { q : (unit -> unit) Mtq.t;
total : int;
free : int ref;
mutable blocked : bool;
}
let create n =
let t = { q = Mtq.create (); total = n; free = ref (-1); blocked = false;} in t
let init t =
let worker _i =
while true do
let f = Mtq.get t.q in
atomic_decr t.free;
begin try f () with exn -> log #warn ~exn "ThreadPool" end;
atomic_incr t.free;
done
in
t.free := t.total;
for i = 1 to t.total do
let (_:Thread.t) = log_create worker i in ()
done
let status t = Printf.sprintf "queue %d threads %d of %d"
(Mtq.length t.q) (atomic_get t.free) t.total
let put t =
if atomic_get t.free = -1 then init t;
while t.blocked do
Nix.sleep 0.05
done;
Mtq.put t.q
let wait_blocked ?(n=0) t =
if (atomic_get t.free <> -1) then begin
while t.blocked do Nix.sleep 0.05 done;(* Wait for unblock *)
t.blocked <- true;
assert(n>=0);
let i = ref 1 in
while Mtq.length t.q + (t.total - atomic_get t.free)> n do (* Notice that some workers can be launched! *)
if !i = 100 || !i mod 1000 = 0 then
log #info "Thread Pool - waiting block : %s" (status t);
Nix.sleep 0.05;
incr i
done;
t.blocked <- false
end
end