Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve worker cleanup on early coordinator exit #5152

Merged
merged 1 commit into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/couch_replicator/src/couch_replicator_fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

docs(DbName, Options, QueryArgs, Callback, Acc) ->
Shards = mem3:shards(DbName),
Workers0 = fabric_util:submit_jobs(
Workers0 = fabric_streams:submit_jobs(
Shards, couch_replicator_fabric_rpc, docs, [Options, QueryArgs]
),
RexiMon = fabric_util:create_monitors(Workers0),
Expand Down
113 changes: 102 additions & 11 deletions src/fabric/src/fabric_streams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
-module(fabric_streams).

-export([
submit_jobs/4,
start/2,
start/3,
start/4,
Expand All @@ -27,6 +28,23 @@

-define(WORKER_CLEANER, fabric_worker_cleaner).

% This is the streams equivalent of fabric_util:submit_jobs/4. Besides
% submitting the jobs it also starts the worker cleaner and adds each started
% job to the cleaner first before the job is submitted.
%
submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
% Create refs first and add them to the cleaner to ensure if our process
% gets killed, the remote workers will be cleaned up as well.
RefFun = fun(#shard{} = Shard) -> Shard#shard{ref = make_ref()} end,
Workers = lists:map(RefFun, Shards),
ClientReq = chttpd_util:mochiweb_client_req_get(),
spawn_worker_cleaner(self(), Workers, ClientReq),
SubmitFun = fun(#shard{node = Node, name = ShardName, ref = Ref}) ->
rexi:cast_ref(Ref, Node, {Module, EndPoint, [ShardName | ExtraArgs]})
end,
ok = lists:foreach(SubmitFun, Workers),
Workers.

start(Workers, Keypos) ->
start(Workers, Keypos, undefined, undefined).

Expand Down Expand Up @@ -158,39 +176,49 @@ handle_stream_start(Else, _, _) ->
% Spawn an auxiliary rexi worker cleaner. This will be used in cases
% when the coordinator (request) process is forceably killed and doesn't
% get a chance to process its `after` fabric:clean/1 clause.
spawn_worker_cleaner(Coordinator, Workers, ClientReq) ->
spawn_worker_cleaner(Coordinator, Workers, ClientReq) when
is_pid(Coordinator), is_list(Workers)
->
case get(?WORKER_CLEANER) of
undefined ->
Pid = spawn(fun() ->
erlang:monitor(process, Coordinator),
cleaner_loop(Coordinator, Workers, ClientReq)
NodeRefSet = set_from_list(shards_to_node_refs(Workers)),
nickva marked this conversation as resolved.
Show resolved Hide resolved
cleaner_loop(Coordinator, NodeRefSet, ClientReq)
end),
put(?WORKER_CLEANER, Pid),
Pid;
ExistingCleaner ->
ExistingCleaner when is_pid(ExistingCleaner) ->
ExistingCleaner
end.

cleaner_loop(Pid, Workers, ClientReq) ->
cleaner_loop(Pid, NodeRefSet, ClientReq) ->
CheckMSec = chttpd_util:mochiweb_client_req_check_msec(),
receive
{add_worker, Pid, Worker} ->
cleaner_loop(Pid, [Worker | Workers], ClientReq);
{add_node_ref, Pid, {_, _} = NodeRef} ->
cleaner_loop(Pid, sets:add_element(NodeRef, NodeRefSet), ClientReq);
{'DOWN', _, _, Pid, _} ->
fabric_util:cleanup(Workers)
rexi:kill_all(sets:to_list(NodeRefSet))
after CheckMSec ->
chttpd_util:stop_client_process_if_disconnected(Pid, ClientReq),
cleaner_loop(Pid, Workers, ClientReq)
cleaner_loop(Pid, NodeRefSet, ClientReq)
end.

add_worker_to_cleaner(CoordinatorPid, Worker) ->
add_worker_to_cleaner(CoordinatorPid, #shard{node = Node, ref = Ref}) ->
nickva marked this conversation as resolved.
Show resolved Hide resolved
case get(?WORKER_CLEANER) of
CleanerPid when is_pid(CleanerPid) ->
CleanerPid ! {add_worker, CoordinatorPid, Worker};
CleanerPid ! {add_node_ref, CoordinatorPid, {Node, Ref}};
_ ->
ok
end.

set_from_list(List) when is_list(List) ->
sets:from_list(List, [{version, 2}]).

shards_to_node_refs(Workers) when is_list(Workers) ->
Fun = fun(#shard{node = Node, ref = Ref}) -> {Node, Ref} end,
lists:map(Fun, Workers).

-ifdef(TEST).

-include_lib("couch/include/couch_eunit.hrl").
Expand All @@ -207,7 +235,8 @@ worker_cleaner_test_() ->
?TDEF_FE(does_not_fire_if_cleanup_called),
?TDEF_FE(should_clean_additional_worker_too),
?TDEF_FE(coordinator_is_killed_if_client_disconnects),
?TDEF_FE(coordinator_is_not_killed_if_client_is_connected)
?TDEF_FE(coordinator_is_not_killed_if_client_is_connected),
?TDEF_FE(submit_jobs_sets_up_cleaner)
]
}
}.
Expand Down Expand Up @@ -351,6 +380,68 @@ coordinator_is_not_killed_if_client_is_connected(_) ->
{'DOWN', CleanerRef, _, _, _} -> ok
end.

submit_jobs_sets_up_cleaner(_) ->
meck:reset(rexi),
erase(?WORKER_CLEANER),
Shards = [
#shard{node = 'n1'},
#shard{node = 'n2'}
],
meck:expect(rexi, cast_ref, fun(Ref, _, _) -> Ref end),
{Coord, CoordRef} = spawn_monitor(fun() ->
Workers = submit_jobs(Shards, fabric_rpc, potatoes, []),
receive
{get_workers_and_cleaner, From} ->
From ! {Workers, get(?WORKER_CLEANER)},
timer:sleep(999999)
end
end),
Coord ! {get_workers_and_cleaner, self()},
{Workers, Cleaner} =
receive
Msg -> Msg
end,
?assert(is_pid(Cleaner)),
?assert(is_process_alive(Cleaner)),
?assert(is_process_alive(Coord)),
CheckWorkerFun = fun(#shard{node = Node, ref = Ref}) ->
?assert(is_reference(Ref)),
{Node, Ref}
end,
NodeRefs = lists:map(CheckWorkerFun, Workers),
?assertEqual(length(Shards), length(Workers)),
?assertEqual(length(lists:usort(NodeRefs)), length(NodeRefs)),
% Were the jobs actually submitted?
meck:wait(2, rexi, cast_ref, '_', 1000),
% If we kill the coordinator, the cleaner should kill the workers
meck:reset(rexi),
CleanupMon = erlang:monitor(process, Cleaner),
exit(Coord, kill),
receive
{'DOWN', CoordRef, _, _, WorkerReason} ->
?assertEqual(killed, WorkerReason)
after 1000 ->
?assert(is_process_alive(Coord))
end,
% Cleaner should do the cleanup
meck:wait(1, rexi, kill_all, '_', 1000),
History = meck:history(rexi),
?assertMatch([{_, {rexi, kill_all, _}, ok}], History),
[{Pid, {rexi, kill_all, Args}, ok}] = History,
% It was the cleaner who called it
?assertEqual(Cleaner, Pid),
?assertMatch([[{_, _}, {_, _}]], Args),
[NodeRefsKilled] = Args,
% The node refs killed are the ones we expect
?assertEqual(lists:sort(NodeRefs), lists:sort(NodeRefsKilled)),
% Cleanup process should exit when done
receive
{'DOWN', CleanupMon, _, _, CleanerReason} ->
?assertEqual(normal, CleanerReason)
after 1000 ->
?assert(is_process_alive(Cleaner))
end.

setup() ->
ok = meck:expect(rexi, kill_all, fun(_) -> ok end),
% Speed up disconnect socket timeout for the test to 200 msec
Expand Down
2 changes: 1 addition & 1 deletion src/fabric/src/fabric_view_all_docs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ go(Db, Options, #mrargs{keys = undefined} = QueryArgs, Callback, Acc) ->
{CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(QueryArgs),
DbName = fabric:dbname(Db),
{Shards, RingOpts} = shards(Db, QueryArgs),
Workers0 = fabric_util:submit_jobs(
Workers0 = fabric_streams:submit_jobs(
Shards, fabric_rpc, all_docs, [Options, WorkerArgs]
),
RexiMon = fabric_util:create_monitors(Workers0),
Expand Down
4 changes: 2 additions & 2 deletions src/fabric/src/fabric_view_map.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ go(Db, Options, DDoc, View, Args0, Callback, Acc, VInfo) ->
Repls = fabric_ring:get_shard_replacements(DbName, Shards),
RPCArgs = [DocIdAndRev, View, WorkerArgs, Options],
StartFun = fun(Shard) ->
hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
hd(fabric_streams:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
end,
Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
Workers0 = fabric_streams:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
RexiMon = fabric_util:create_monitors(Workers0),
try
case
Expand Down
4 changes: 2 additions & 2 deletions src/fabric/src/fabric_view_reduce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
Repls = fabric_ring:get_shard_replacements(DbName, Shards),
StartFun = fun(Shard) ->
hd(fabric_util:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs))
hd(fabric_streams:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs))
end,
Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, reduce_view, RPCArgs),
Workers0 = fabric_streams:submit_jobs(Shards, fabric_rpc, reduce_view, RPCArgs),
RexiMon = fabric_util:create_monitors(Workers0),
try
case
Expand Down
Loading