Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve fabric streams cleanup on error and timeouts #5160

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions src/couch_replicator/src/couch_replicator_fabric.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,8 @@ docs(DbName, Options, QueryArgs, Callback, Acc) ->
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers, waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"replicator docs"
),
{timeout, DefunctWorkers} ->
fabric_util:log_timeout(DefunctWorkers, "replicator docs"),
Comment on lines -41 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but it's curious that erlfmt is fine with this on a single line now, but before it took 3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was surprised as well. I wonder if we had enforced an 80 columns things before and now we don't (or erlfmt changed). But thought it is a decent cleanup. Probably there are other cases we can prettify a bit.

Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
Expand Down
8 changes: 0 additions & 8 deletions src/fabric/include/fabric.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@
update_seq
}).

-record(stream_acc, {
workers,
ready,
start_fun,
replacements,
ring_opts
}).

-record(view_row, {key, id, value, doc, worker}).

-type row_property_key() :: id | key | value | doc | worker.
Expand Down
55 changes: 49 additions & 6 deletions src/fabric/src/fabric_streams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@
add_worker_to_cleaner/2
]).

-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").

-record(stream_acc, {
workers,
ready,
start_fun,
replacements,
ring_opts
}).

-define(WORKER_CLEANER, fabric_worker_cleaner).

% This is the streams equivalent of fabric_util:submit_jobs/4. Besides
Expand Down Expand Up @@ -77,7 +84,12 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
Workers
),
{ok, AckedWorkers};
{timeout, #stream_acc{workers = Defunct}} ->
cleanup(Workers0),
DefunctWorkers = fabric_util:remove_done_workers(Defunct, waiting),
{timeout, DefunctWorkers};
Else ->
cleanup(Workers0),
Else
end.

Expand Down Expand Up @@ -165,10 +177,7 @@ handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
{stop, St#stream_acc{workers = [], ready = Ready1}}
end
end;
handle_stream_start({ok, Error}, _, St) when Error == ddoc_updated; Error == insufficient_storage ->
WaitingWorkers = [W || {W, _} <- St#stream_acc.workers],
ReadyWorkers = [W || {W, _} <- St#stream_acc.ready],
cleanup(WaitingWorkers ++ ReadyWorkers),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this cleanup no longer necessary because of the new cleanup(Workers0) in the above Else clause?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we always do it on any non {ok, #stream_acc{} result, be it {ok, ddoc_updated}, {error, ...} or {timeout, ...}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I debated for a bit adding all those explicit cases for "non-success" responses but figured it also makes sense to have a general "whatever else happens we clean up" clause.

handle_stream_start({ok, Error}, _, _) when Error == ddoc_updated; Error == insufficient_storage ->
{stop, Error};
handle_stream_start(Else, _, _) ->
exit({invalid_stream_start, Else}).
Expand Down Expand Up @@ -236,7 +245,9 @@ worker_cleaner_test_() ->
?TDEF_FE(should_clean_additional_worker_too),
?TDEF_FE(coordinator_is_killed_if_client_disconnects),
?TDEF_FE(coordinator_is_not_killed_if_client_is_connected),
?TDEF_FE(submit_jobs_sets_up_cleaner)
?TDEF_FE(submit_jobs_sets_up_cleaner),
?TDEF_FE(cleanup_called_on_timeout),
?TDEF_FE(cleanup_called_on_error)
]
}
}.
Expand Down Expand Up @@ -442,7 +453,39 @@ submit_jobs_sets_up_cleaner(_) ->
?assert(is_process_alive(Cleaner))
end.

cleanup_called_on_timeout(_) ->
Ref1 = make_ref(),
Ref2 = make_ref(),
W1 = #shard{node = 'n1', ref = Ref1},
W2 = #shard{node = 'n2', ref = Ref2},
Workers = [W1, W2],
meck:expect(rexi_utils, recv, fun(_, _, _, Acc, _, _) ->
{timeout, Acc#stream_acc{workers = [{W2, waiting}]}}
end),
meck:reset(fabric_util),
Res = start(Workers, #shard.ref, undefined, undefined, []),
?assertEqual({timeout, [W2]}, Res),
?assert(meck:called(fabric_util, cleanup, 1)).

cleanup_called_on_error(_) ->
Ref1 = make_ref(),
Ref2 = make_ref(),
W1 = #shard{node = 'n1', ref = Ref1},
W2 = #shard{node = 'n2', ref = Ref2},
Workers = [W1, W2],
meck:expect(rexi_utils, recv, fun(_, _, _, _, _, _) ->
{error, foo}
end),
meck:reset(fabric_util),
Res = start(Workers, #shard.ref, undefined, undefined, []),
?assertEqual({error, foo}, Res),
?assert(meck:called(fabric_util, cleanup, 1)).

setup() ->
ok = meck:new(rexi_utils, [passthrough]),
ok = meck:new(config, [passthrough]),
ok = meck:new(fabric_util, [passthrough]),
meck:expect(config, get, fun(_, _, Default) -> Default end),
ok = meck:expect(rexi, kill_all, fun(_) -> ok end),
% Speed up disconnect socket timeout for the test to 200 msec
ok = meck:expect(chttpd_util, mochiweb_client_req_check_msec, 0, 200).
Expand Down
10 changes: 2 additions & 8 deletions src/fabric/src/fabric_view_all_docs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,8 @@ go(Db, Options, #mrargs{keys = undefined} = QueryArgs, Callback, Acc) ->
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers, waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"all_docs"
),
{timeout, DefunctWorkers} ->
fabric_util:log_timeout(DefunctWorkers, "all_docs"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
Expand Down
11 changes: 2 additions & 9 deletions src/fabric/src/fabric_view_changes.erl
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,8 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers,
waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"changes"
),
{timeout, DefunctWorkers} ->
fabric_util:log_timeout(DefunctWorkers, "changes"),
throw({error, timeout});
{error, Reason} ->
throw({error, Reason});
Expand Down
12 changes: 2 additions & 10 deletions src/fabric/src/fabric_view_map.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").

go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) when
Expand Down Expand Up @@ -66,15 +65,8 @@ go(Db, Options, DDoc, View, Args0, Callback, Acc, VInfo) ->
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers,
waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"map_view"
),
{timeout, DefunctWorkers} ->
fabric_util:log_timeout(DefunctWorkers, "map_view"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
Expand Down
12 changes: 2 additions & 10 deletions src/fabric/src/fabric_view_reduce.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").

go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) ->
Expand Down Expand Up @@ -55,15 +54,8 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
after
fabric_streams:cleanup(Workers)
end;
{timeout, NewState} ->
DefunctWorkers = fabric_util:remove_done_workers(
NewState#stream_acc.workers,
waiting
),
fabric_util:log_timeout(
DefunctWorkers,
"reduce_view"
),
{timeout, DefunctWorkers} ->
fabric_util:log_timeout(DefunctWorkers, "reduce_view"),
Callback({error, timeout}, Acc);
{error, Error} ->
Callback({error, Error}, Acc)
Expand Down