From 31f0da38419aa012e322fbc3ba3f92624c7d1d43 Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Fri, 2 Aug 2024 02:25:04 -0400 Subject: [PATCH] Improve fabric streams cleanup on error and timeouts Previously, we performed cleanup only for specific errors such as `ddoc_updated`, and `insufficient_storage`. In case of other errors, or timeouts, there was a chance we would leak workers waiting to be either started or canceled. Those workers would then wait around until the 5 minute rexi timeout fires, and then they emit an error in the logs. It's not a big deal as that happens on errors only, and the processes are all waiting in receive, however, they do hold a Db handle open, so they can waste resources from that point of view. To fix that, this commit extends cleanup to other errors and timeouts. Moreover, in case of timeouts, we log fabric worker timeout errors. In order to do that we export the `fabric_streams` internal `#stream_acc` record to every `fabric_streams` user. That's a bit untidy, so make the timeout error return the defunct workers only, and so, we can avoid leaking the `#stream_acc` record outside the fabric_streams module. Related to https://github.com/apache/couchdb/issues/5127 --- .../src/couch_replicator_fabric.erl | 10 +--- src/fabric/include/fabric.hrl | 8 --- src/fabric/src/fabric_streams.erl | 55 +++++++++++++++++-- src/fabric/src/fabric_view_all_docs.erl | 10 +--- src/fabric/src/fabric_view_changes.erl | 11 +--- src/fabric/src/fabric_view_map.erl | 12 +--- src/fabric/src/fabric_view_reduce.erl | 12 +--- 7 files changed, 59 insertions(+), 59 deletions(-) diff --git a/src/couch_replicator/src/couch_replicator_fabric.erl b/src/couch_replicator/src/couch_replicator_fabric.erl index 43321f26fe..cb441fea71 100644 --- a/src/couch_replicator/src/couch_replicator_fabric.erl +++ b/src/couch_replicator/src/couch_replicator_fabric.erl @@ -34,14 +34,8 @@ docs(DbName, Options, QueryArgs, Callback, Acc) -> after fabric_streams:cleanup(Workers) end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "replicator docs" - ), + {timeout, DefunctWorkers} -> + fabric_util:log_timeout(DefunctWorkers, "replicator docs"), Callback({error, timeout}, Acc); {error, Error} -> Callback({error, Error}, Acc) diff --git a/src/fabric/include/fabric.hrl b/src/fabric/include/fabric.hrl index dd312f0289..6312741c23 100644 --- a/src/fabric/include/fabric.hrl +++ b/src/fabric/include/fabric.hrl @@ -32,14 +32,6 @@ update_seq }). --record(stream_acc, { - workers, - ready, - start_fun, - replacements, - ring_opts -}). - -record(view_row, {key, id, value, doc, worker}). -type row_property_key() :: id | key | value | doc | worker. diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl index 3f14cde454..85a4978c4e 100644 --- a/src/fabric/src/fabric_streams.erl +++ b/src/fabric/src/fabric_streams.erl @@ -23,9 +23,16 @@ add_worker_to_cleaner/2 ]). --include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). +-record(stream_acc, { + workers, + ready, + start_fun, + replacements, + ring_opts +}). + -define(WORKER_CLEANER, fabric_worker_cleaner). % This is the streams equivalent of fabric_util:submit_jobs/4. Besides @@ -77,7 +84,12 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) -> Workers ), {ok, AckedWorkers}; + {timeout, #stream_acc{workers = Defunct}} -> + cleanup(Workers0), + DefunctWorkers = fabric_util:remove_done_workers(Defunct, waiting), + {timeout, DefunctWorkers}; Else -> + cleanup(Workers0), Else end. @@ -165,10 +177,7 @@ handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) -> {stop, St#stream_acc{workers = [], ready = Ready1}} end end; -handle_stream_start({ok, Error}, _, St) when Error == ddoc_updated; Error == insufficient_storage -> - WaitingWorkers = [W || {W, _} <- St#stream_acc.workers], - ReadyWorkers = [W || {W, _} <- St#stream_acc.ready], - cleanup(WaitingWorkers ++ ReadyWorkers), +handle_stream_start({ok, Error}, _, _) when Error == ddoc_updated; Error == insufficient_storage -> {stop, Error}; handle_stream_start(Else, _, _) -> exit({invalid_stream_start, Else}). @@ -236,7 +245,9 @@ worker_cleaner_test_() -> ?TDEF_FE(should_clean_additional_worker_too), ?TDEF_FE(coordinator_is_killed_if_client_disconnects), ?TDEF_FE(coordinator_is_not_killed_if_client_is_connected), - ?TDEF_FE(submit_jobs_sets_up_cleaner) + ?TDEF_FE(submit_jobs_sets_up_cleaner), + ?TDEF_FE(cleanup_called_on_timeout), + ?TDEF_FE(cleanup_called_on_error) ] } }. @@ -442,7 +453,39 @@ submit_jobs_sets_up_cleaner(_) -> ?assert(is_process_alive(Cleaner)) end. +cleanup_called_on_timeout(_) -> + Ref1 = make_ref(), + Ref2 = make_ref(), + W1 = #shard{node = 'n1', ref = Ref1}, + W2 = #shard{node = 'n2', ref = Ref2}, + Workers = [W1, W2], + meck:expect(rexi_utils, recv, fun(_, _, _, Acc, _, _) -> + {timeout, Acc#stream_acc{workers = [{W2, waiting}]}} + end), + meck:reset(fabric_util), + Res = start(Workers, #shard.ref, undefined, undefined, []), + ?assertEqual({timeout, [W2]}, Res), + ?assert(meck:called(fabric_util, cleanup, 1)). + +cleanup_called_on_error(_) -> + Ref1 = make_ref(), + Ref2 = make_ref(), + W1 = #shard{node = 'n1', ref = Ref1}, + W2 = #shard{node = 'n2', ref = Ref2}, + Workers = [W1, W2], + meck:expect(rexi_utils, recv, fun(_, _, _, _, _, _) -> + {error, foo} + end), + meck:reset(fabric_util), + Res = start(Workers, #shard.ref, undefined, undefined, []), + ?assertEqual({error, foo}, Res), + ?assert(meck:called(fabric_util, cleanup, 1)). + setup() -> + ok = meck:new(rexi_utils, [passthrough]), + ok = meck:new(config, [passthrough]), + ok = meck:new(fabric_util, [passthrough]), + meck:expect(config, get, fun(_, _, Default) -> Default end), ok = meck:expect(rexi, kill_all, fun(_) -> ok end), % Speed up disconnect socket timeout for the test to 200 msec ok = meck:expect(chttpd_util, mochiweb_client_req_check_msec, 0, 200). diff --git a/src/fabric/src/fabric_view_all_docs.erl b/src/fabric/src/fabric_view_all_docs.erl index 3a03357c24..2d0133acb5 100644 --- a/src/fabric/src/fabric_view_all_docs.erl +++ b/src/fabric/src/fabric_view_all_docs.erl @@ -37,14 +37,8 @@ go(Db, Options, #mrargs{keys = undefined} = QueryArgs, Callback, Acc) -> after fabric_streams:cleanup(Workers) end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "all_docs" - ), + {timeout, DefunctWorkers} -> + fabric_util:log_timeout(DefunctWorkers, "all_docs"), Callback({error, timeout}, Acc); {error, Error} -> Callback({error, Error}, Acc) diff --git a/src/fabric/src/fabric_view_changes.erl b/src/fabric/src/fabric_view_changes.erl index 85bc7370cc..410c057c27 100644 --- a/src/fabric/src/fabric_view_changes.erl +++ b/src/fabric/src/fabric_view_changes.erl @@ -199,15 +199,8 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) -> after fabric_streams:cleanup(Workers) end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, - waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "changes" - ), + {timeout, DefunctWorkers} -> + fabric_util:log_timeout(DefunctWorkers, "changes"), throw({error, timeout}); {error, Reason} -> throw({error, Reason}); diff --git a/src/fabric/src/fabric_view_map.erl b/src/fabric/src/fabric_view_map.erl index 6f13270a98..cc8ed6cf1c 100644 --- a/src/fabric/src/fabric_view_map.erl +++ b/src/fabric/src/fabric_view_map.erl @@ -16,7 +16,6 @@ -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). go(DbName, Options, GroupId, View, Args, Callback, Acc, VInfo) when @@ -66,15 +65,8 @@ go(Db, Options, DDoc, View, Args0, Callback, Acc, VInfo) -> after fabric_streams:cleanup(Workers) end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, - waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "map_view" - ), + {timeout, DefunctWorkers} -> + fabric_util:log_timeout(DefunctWorkers, "map_view"), Callback({error, timeout}, Acc); {error, Error} -> Callback({error, Error}, Acc) diff --git a/src/fabric/src/fabric_view_reduce.erl b/src/fabric/src/fabric_view_reduce.erl index 04d73bd943..3206d01a44 100644 --- a/src/fabric/src/fabric_view_reduce.erl +++ b/src/fabric/src/fabric_view_reduce.erl @@ -16,7 +16,6 @@ -include_lib("fabric/include/fabric.hrl"). -include_lib("mem3/include/mem3.hrl"). --include_lib("couch/include/couch_db.hrl"). -include_lib("couch_mrview/include/couch_mrview.hrl"). go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -> @@ -55,15 +54,8 @@ go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> after fabric_streams:cleanup(Workers) end; - {timeout, NewState} -> - DefunctWorkers = fabric_util:remove_done_workers( - NewState#stream_acc.workers, - waiting - ), - fabric_util:log_timeout( - DefunctWorkers, - "reduce_view" - ), + {timeout, DefunctWorkers} -> + fabric_util:log_timeout(DefunctWorkers, "reduce_view"), Callback({error, timeout}, Acc); {error, Error} -> Callback({error, Error}, Acc)