Skip to content

Commit

Permalink
Merge pull request #30 from ahrefs/no-relaunch-0
Browse files Browse the repository at this point in the history
parallel: add option to not revive upon clear termination
  • Loading branch information
yasunariw authored Oct 25, 2024
2 parents cb6d2c6 + 7562d2b commit dc07cd9
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 30 deletions.
90 changes: 62 additions & 28 deletions parallel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ open Prelude

let log = Log.from "parallel"

type revive_mode =
| Never
| On_failure
| Always

module type WorkerT = sig
type task
type result
Expand All @@ -18,15 +23,29 @@ val perform : t -> ?autoexit:bool -> task Enum.t -> (result -> unit) -> unit
val stop : ?wait:int -> t -> unit
end

(** @return list of reaped and live pids *)
(** @return list of reaped PIDs with status, list of live PIDs *)
let reap l =
let open Unix in
List.partition (fun pid ->
try
pid = fst (waitpid [WNOHANG] pid)
with
| Unix_error (ECHILD,_,_) -> true (* exited *)
| exn -> log #warn ~exn "Worker PID %d lost (wait)" pid; true) l
List.fold_left begin fun (acc_dead, acc_alive) pid ->
let dead result = (pid, result) :: acc_dead, acc_alive in
let alive () = acc_dead, pid :: acc_alive in
match waitpid [WNOHANG] pid with
| 0, _ ->
(* due to NOHANG *)
alive ()
| v, _ when v != pid ->
log #warn "Unexpected waitpid value %d for PID %d" v pid;
alive ()
| v, result ->
assert (v = pid);
dead (Some result)
| exception Unix_error (ECHILD,_,_) ->
dead None
(* exited *)
| exception exn ->
log #warn ~exn "Worker PID %d lost (wait)" pid;
dead None
end ([], []) l

let hard_kill1 pid =
let open Unix in
Expand Down Expand Up @@ -230,21 +249,42 @@ let rec launch_forks f = function
| `Forked _ -> launch_forks f xs

(** keep the specifed number of workers running *)
let run_forks_simple ?(revive=false) ?wait_stop f args =
let run_forks_simple ?(revive=Never) ?wait_stop f args =
let workers = Hashtbl.create 1 in
let launch f x =
match Nix.fork () with
| `Child ->
let () = try f x with exn -> log #error ~exn ~backtrace:true "worker failed" in
exit 0
begin try
f x;
exit 0
with exn ->
log #error ~exn ~backtrace:true "worker failed";
exit 1
end
| `Forked pid -> Hashtbl.add workers pid x; pid
in
args |> List.iter (fun x -> let (_:int) = launch f x in ());
let pids () = Hashtbl.keys workers |> List.of_enum in
let maybe_revive ~always dead =
dead |> List.iter begin fun (pid, result) ->
match Hashtbl.find workers pid with
| exception Not_found -> log #warn "WUT? Not my worker %d" pid
| x ->
Hashtbl.remove workers pid;
match result with
| Some (Unix.WEXITED 0) when not always ->
(* do not relaunch *)
log #info "worker %d exited" pid;
| _ ->
match launch f x with
| exception exn -> log #error ~exn "restart"
| pid' -> log #info "worker %d exited%s, replaced with %d" pid (if always then "" else "with non-zero status") pid';
end
in
let rec loop pause =
Nix.sleep pause;
let total = Hashtbl.length workers in
if total = 0 && not revive then
if total = 0 && revive <> Always then
log #info "All workers dead, stopping"
else
match Daemon.should_exit () with
Expand All @@ -256,23 +296,17 @@ let run_forks_simple ?(revive=false) ?wait_stop f args =
end
| false ->
let (dead,_live) = reap (pids ()) in
match dead with
| [] -> loop (max 1. (pause /. 2.))
| dead when revive ->
let pause = min 10. (pause *. 1.5) in
dead |> List.iter begin fun pid ->
match Hashtbl.find workers pid with
| exception Not_found -> log #warn "WUT? Not my worker %d" pid
| x ->
Hashtbl.remove workers pid;
match launch f x with
| exception exn -> log #error ~exn "restart"
| pid' -> log #info "worker %d exited, replaced with %d" pid pid';
end;
loop pause
| dead ->
log #info "%d child workers exited (PIDs: %s)" (List.length dead) (Stre.list string_of_int dead);
List.iter (Hashtbl.remove workers) dead;
match dead, revive with
| [], _ -> loop (max 1. (pause /. 2.))
| dead, Always ->
maybe_revive ~always:true dead;
loop (min 10. (pause *. 1.5))
| dead, On_failure ->
maybe_revive ~always:false dead;
loop (min 10. (pause *. 1.5))
| dead, Never ->
log #info "%d child workers exited (PIDs: %s)" (List.length dead) (Stre.list (string_of_int $ fst) dead);
List.iter (Hashtbl.remove workers $ fst) dead;
loop pause
in
loop 1.
Expand Down
9 changes: 7 additions & 2 deletions parallel.mli
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
(** Parallel *)

type revive_mode =
| Never (** never revive worker *)
| On_failure (** revive when worker exits with non-zero code *)
| Always (** revive worker regardless of exit code *)

(** Invoke function in a forked process and return result *)
val invoke : ('a -> 'b) -> 'a -> unit -> 'b

Expand All @@ -9,9 +14,9 @@ val launch_forks : ('a -> unit) -> 'a list -> unit

(** Launch forks for each element of the list and wait for all workers to finish.
Pass exit signals to the workers, see {!Forks.stop} for the description of [wait_stop] parameter.
@param revive to keep workers running (restarting with same param if exited) [default: false]
@param revive to keep workers running (restarting with same param if exited) [default: Never]
*)
val run_forks : ?wait_stop:int -> ?revive:bool -> ?wait:int -> ?workers:int -> ('a -> unit) -> 'a list -> unit
val run_forks : ?wait_stop:int -> ?revive:revive_mode -> ?wait:int -> ?workers:int -> ('a -> unit) -> 'a list -> unit

(** Same as [run_forks] but do not fork for one worker *)
val run_forks' : ('a -> unit) -> 'a list -> unit
Expand Down

0 comments on commit dc07cd9

Please sign in to comment.