Skip to content

Commit

Permalink
Merge pull request #50 from esl/less-checks
Browse files Browse the repository at this point in the history
Do not run check in cets_discovery on nodedown
  • Loading branch information
chrzaszcz committed Mar 4, 2024
2 parents 00ceb7b + fbaf1d8 commit aac8ba7
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 108 deletions.
29 changes: 24 additions & 5 deletions src/cets.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,16 @@
}.
%% Status information returned `info/1'.

-type handle_down_fun() :: fun((#{remote_pid := server_pid(), table := table_name()}) -> ok).
-type handle_down_fun() :: fun(
(
#{
remote_pid := server_pid(),
remote_node := node(),
table := table_name(),
is_leader := boolean()
}
) -> ok
).
%% Handler function which is called when the remote node goes down.

-type handle_conflict_fun() :: fun((tuple(), tuple()) -> tuple()).
Expand Down Expand Up @@ -647,9 +656,12 @@ handle_down2(RemotePid, Reason, State = #{other_servers := Servers, ack_pid := A
case lists:member(RemotePid, Servers) of
true ->
cets_ack:send_remote_down(AckPid, RemotePid),
call_user_handle_down(RemotePid, State),
Servers2 = lists:delete(RemotePid, Servers),
update_node_down_history(RemotePid, Reason, set_other_servers(Servers2, State));
State3 = update_node_down_history(
RemotePid, Reason, set_other_servers(Servers2, State)
),
call_user_handle_down(RemotePid, State3),
State3;
false ->
%% This should not happen
?LOG_ERROR(#{
Expand Down Expand Up @@ -896,10 +908,17 @@ handle_get_info(

%% Cleanup
-spec call_user_handle_down(server_pid(), state()) -> ok.
call_user_handle_down(RemotePid, #{tab := Tab, opts := Opts}) ->
call_user_handle_down(RemotePid, #{tab := Tab, opts := Opts, is_leader := IsLeader}) ->
case Opts of
#{handle_down := F} ->
FF = fun() -> F(#{remote_pid => RemotePid, table => Tab}) end,
FF = fun() ->
F(#{
remote_pid => RemotePid,
remote_node => node(RemotePid),
table => Tab,
is_leader => IsLeader
})
end,
Info = #{
task => call_user_handle_down,
table => Tab,
Expand Down
69 changes: 52 additions & 17 deletions src/cets_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
-type get_nodes_result() :: {ok, [node()]} | {error, term()}.
%% Result of `get_nodes/2' call.

-type retry_type() :: initial | after_error | regular.
-type retry_type() :: initial | after_error | regular | after_nodedown.
%% Retry logic type.

-type from() :: {pid(), reference()}.
Expand All @@ -107,7 +107,7 @@
phase := initial | regular,
results := [join_result()],
nodes := ordsets:ordset(node()),
%% The nodes that returned pang, sorted
%% The nodes that returned pang or nodedown, sorted
unavailable_nodes := ordsets:ordset(node()),
tables := [atom()],
backend_module := module(),
Expand Down Expand Up @@ -216,7 +216,7 @@ wait_for_get_nodes(Server, Timeout) ->
%% @private
-spec init(term()) -> {ok, state()}.
init(Opts) ->
StartTime = erlang:system_time(millisecond),
StartTime = get_time(),
%% Sends nodeup / nodedown
ok = net_kernel:monitor_nodes(true),
Mod = maps:get(backend_module, Opts, cets_discovery_file),
Expand Down Expand Up @@ -308,8 +308,6 @@ handle_info({nodeup, Node}, State) ->
{noreply, try_joining(State3)};
handle_info({nodedown, Node}, State) ->
State2 = handle_nodedown(Node, State),
%% Do another check to update unavailable_nodes list
self() ! check,
{noreply, State2};
handle_info({start_time, Node, StartTime}, State) ->
{noreply, handle_receive_start_time(Node, StartTime, State)};
Expand Down Expand Up @@ -409,6 +407,9 @@ prune_unavailable_nodes_if_needed(State = #{nodes := Nodes, unavailable_nodes :=
%% Unavailable nodes is a subset of discovered nodes
State#{unavailable_nodes := ordsets:intersection(Nodes, UnNodes)}.

%% We should not ping nodes that just got disconnected.
%% Let the disconnected node to connect if it restarts on its own.
%% Or reconnect to it after a timeout.
-spec ping_not_connected_nodes([node()]) -> ok.
ping_not_connected_nodes(Nodes) ->
Self = self(),
Expand Down Expand Up @@ -454,16 +455,36 @@ choose_retry_type(#{last_get_nodes_result := {error, _}}) ->
after_error;
choose_retry_type(#{phase := initial}) ->
initial;
choose_retry_type(_) ->
regular.
choose_retry_type(State) ->
case last_node_down(State) of
false ->
regular;
Node ->
%% Allow to reconnect after a netsplit but not too quick.
GracePeriod = retry_type_to_timeout(after_nodedown),
case get_downtime(Node, State) < GracePeriod of
true ->
after_nodedown;
false ->
regular
end
end.

-spec last_node_down(state()) -> false | node().
last_node_down(#{nodedown_timestamps := Map}) when map_size(Map) =:= 0 ->
false;
last_node_down(#{nodedown_timestamps := Map}) ->
{Node, _TS} = lists:last(lists:keysort(2, maps:to_list(Map))),
Node.

%% Returns timeout in milliseconds to retry calling the get_nodes function.
%% get_nodes is called after add_table without waiting.
%% It is also would be retried without waiting if should_retry_get_nodes set to true.
-spec retry_type_to_timeout(retry_type()) -> non_neg_integer().
retry_type_to_timeout(initial) -> timer:seconds(5);
retry_type_to_timeout(after_error) -> timer:seconds(1);
retry_type_to_timeout(regular) -> timer:minutes(5).
retry_type_to_timeout(regular) -> timer:minutes(5);
retry_type_to_timeout(after_nodedown) -> timer:seconds(30).

-spec cancel_old_timer(state()) -> ok.
cancel_old_timer(#{timer_ref := OldRef}) when is_reference(OldRef) ->
Expand All @@ -482,8 +503,9 @@ flush_all_checks() ->

-spec do_join(atom(), node()) -> join_result().
do_join(Tab, Node) ->
%% Possible race condition: Node got disconnected
LocalPid = whereis(Tab),
%% That would trigger autoconnect for the first time
%% That could trigger autoconnect if Node is not connected
case rpc:call(Node, erlang, whereis, [Tab]) of
Pid when is_pid(Pid), is_pid(LocalPid) ->
Result = cets_join:join(cets_discovery, #{table => Tab}, LocalPid, Pid),
Expand Down Expand Up @@ -560,10 +582,10 @@ has_join_result_for(Node, Table, #{results := Results}) ->

-spec handle_system_info(state()) -> system_info().
handle_system_info(State) ->
State#{verify_ready => verify_ready(State)}.
State#{verify_ready => verify_ready(State), retry_type => choose_retry_type(State)}.

-spec handle_nodedown(node(), state()) -> state().
handle_nodedown(Node, State) ->
handle_nodedown(Node, State = #{unavailable_nodes := UnNodes}) ->
State2 = remember_nodedown_timestamp(Node, State),
{NodeUpTime, State3} = remove_nodeup_timestamp(Node, State2),
?LOG_WARNING(
Expand All @@ -574,7 +596,8 @@ handle_nodedown(Node, State) ->
time_since_startup_in_milliseconds => time_since_startup_in_milliseconds(State)
})
),
State3.
State4 = State3#{unavailable_nodes := ordsets:add_element(Node, UnNodes)},
trigger_verify_ready(State4).

-spec handle_nodeup(node(), state()) -> state().
handle_nodeup(Node, State) ->
Expand All @@ -595,13 +618,13 @@ handle_nodeup(Node, State) ->

-spec remember_nodeup_timestamp(node(), state()) -> state().
remember_nodeup_timestamp(Node, State = #{nodeup_timestamps := Map}) ->
Time = erlang:system_time(millisecond),
Time = get_time(),
Map2 = Map#{Node => Time},
State#{nodeup_timestamps := Map2}.

-spec remember_nodedown_timestamp(node(), state()) -> state().
remember_nodedown_timestamp(Node, State = #{nodedown_timestamps := Map}) ->
Time = erlang:system_time(millisecond),
Time = get_time(),
Map2 = Map#{Node => Time},
State#{nodedown_timestamps := Map2}.

Expand All @@ -617,6 +640,7 @@ calculate_uptime(undefined) ->
calculate_uptime(StartTime) ->
time_since(StartTime).

-spec get_downtime(node(), state()) -> milliseconds() | undefined.
get_downtime(Node, #{nodedown_timestamps := Map}) ->
case maps:get(Node, Map, undefined) of
undefined ->
Expand All @@ -633,8 +657,13 @@ set_defined(Key, Value, Map) ->
time_since_startup_in_milliseconds(#{start_time := StartTime}) ->
time_since(StartTime).

time_since(StartTime) ->
erlang:system_time(millisecond) - StartTime.
-spec time_since(integer()) -> integer().
time_since(StartTime) when is_integer(StartTime) ->
get_time() - StartTime.

-spec get_time() -> milliseconds().
get_time() ->
erlang:system_time(millisecond).

send_start_time_to(Node, #{start_time := StartTime}) ->
case erlang:process_info(self(), registered_name) of
Expand All @@ -659,4 +688,10 @@ handle_receive_start_time(Node, StartTime, State = #{node_start_timestamps := Ma
%% Restarted node reconnected, this is fine during the rolling updates
ok
end,
State#{node_start_timestamps := maps:put(Node, StartTime, Map)}.
%% We are in the regular phase,
%% once we get contact with another disco server.
%% It affects the check intervals.
State#{
node_start_timestamps := maps:put(Node, StartTime, Map),
phase := regular
}.
2 changes: 1 addition & 1 deletion src/cets_join.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ join_loop(LockKey, Info, LocalPid, RemotePid, Start, JoinOpts) ->
LockRequest = {LockKey, self()},
%% Just lock all nodes, no magic here :)
Nodes = [node() | nodes()],
Retries = 1,
Retries = 0,
%% global could abort the transaction when one of the nodes goes down.
%% It could usually abort it during startup or update.
case global:trans(LockRequest, F, Nodes, Retries) of
Expand Down
Loading

0 comments on commit aac8ba7

Please sign in to comment.