Skip to content

Commit

Permalink
Shutdown Procedure for remaining sockets
Browse files Browse the repository at this point in the history
When stopping a node, cowboy is waiting for all connections to
be correctly closed. Usually, it waits for the buffer to be
empty before closing the socket. In most of the cases, this
behavior is acceptable, but in some cases, in particular when a
connection is very slow, this can have a direct impact on the
delay to shutdown a node.

This commit adds a shutdown procedure when stopping the client
connections. It has 3 stages: (1) when a node is stopped, all
sockets are set in read-only mode and all clients are noticed of
this change, and they will try to fetch the data before closing
the connection (2) if some nodes are still present, the procedure
sends another closing requests and will wait for a specific delay
(3) if the connection is still up, the connection is simply killed,
linger option is set to {true, 0} and the socket is definitively
closed.

This draining procedure was inspired by ranch documentation example,
that can be found at: https://ninenines.eu/docs/en/ranch/2.2/guide/connection_draining/

see: https://github.com/ArweaveTeam/arweave-dev/issues/817
  • Loading branch information
arweave committed Mar 7, 2025
1 parent ebdf8da commit 82c4542
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 4 deletions.
3 changes: 2 additions & 1 deletion apps/arweave/include/ar_config.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,8 @@
%% Undocumented/unsupported options
chunk_storage_file_size = ?CHUNK_GROUP_SIZE,
rocksdb_flush_interval_s = ?DEFAULT_ROCKSDB_FLUSH_INTERVAL_S,
rocksdb_wal_sync_interval_s = ?DEFAULT_ROCKSDB_WAL_SYNC_INTERVAL_S
rocksdb_wal_sync_interval_s = ?DEFAULT_ROCKSDB_WAL_SYNC_INTERVAL_S,
shutdown_tcp_connection_timeout = 60_000
}).

-endif.
15 changes: 14 additions & 1 deletion apps/arweave/src/ar.erl
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,8 @@ show_help() ->
"repacked. After completing a full verification cycle, you can restart "
"the node in normal mode to have it resync and/or repack any flagged chunks. "
"When running in verify mode several flags will be forced on and several "
"flags are disallowed. See the node output for details."}
"flags are disallowed. See the node output for details."},
{"shutdown_tcp_connection_timeout", "shutdown tcp connection timeout."}
]
),
erlang:halt().
Expand Down Expand Up @@ -664,6 +665,18 @@ parse_cli_args(["rocksdb_flush_interval", Seconds | Rest], C) ->
parse_cli_args(["rocksdb_wal_sync_interval", Seconds | Rest], C) ->
parse_cli_args(Rest, C#config{ rocksdb_wal_sync_interval_s = list_to_integer(Seconds) });

%% shutdown procedure
parse_cli_args(["shutdown_tcp_connection_timeout", Delay|Rest], C) ->
try
IntegerDelay = list_to_integer(Delay),
NewConfig = C#config{ shutdown_tcp_connection_timeout = IntegerDelay },
parse_cli_args(Rest, NewConfig)
catch
_:_ ->
io:format("shutdown_tcp_connection_timeout is invalid"),
parse_cli_args(Rest, C)
end;

%% Undocumented/unsupported options
parse_cli_args(["chunk_storage_file_size", Num | Rest], C) ->
parse_cli_args(Rest, C#config{ chunk_storage_file_size = list_to_integer(Num) });
Expand Down
8 changes: 8 additions & 0 deletions apps/arweave/src/ar_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,14 @@ parse_options([{<<"data_sync_request_packed_chunks">>, Bool} | Rest], Config)
parse_options([{<<"data_sync_request_packed_chunks">>, InvalidValue} | _Rest], _Config) ->
{error, {bad_type, data_sync_request_packed_chunks, boolean}, InvalidValue};

%% shutdown procedure
parse_options([{<<"shutdown_tcp_connection_timeout">>, Delay} | Rest], Config)
when is_integer(Delay) andalso Delay > 0 ->
NewConfig = Config#config{ shutdown_tcp_connection_timeout = Delay },
parse_options(Rest, NewConfig);
parse_options([{<<"shutdown_tcp_connection_timeout">>, InvalidValue} | Rest], Config) ->
{error, {bad_type, shutdown_tcp_connection_timeout, integer}, InvalidValue};

parse_options([Opt | _], _) ->
{error, unknown, Opt};
parse_options([], Config) ->
Expand Down
175 changes: 173 additions & 2 deletions apps/arweave/src/ar_http_iface_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ start_http_iface_listener(Config) ->
TlsCertfilePath = Config#config.tls_cert_file,
TlsKeyfilePath = Config#config.tls_key_file,
TransportOpts = [
{linger, {true, 0}},
{port, Config#config.port},
{keepalive, true},
{max_connections, Config#config.max_connections}
Expand All @@ -95,8 +94,180 @@ start_http_iface_listener(Config) ->
end,
ok.

%%--------------------------------------------------------------------
%% @doc stop the application and execute the shutdown procedure.
%% @end
%%--------------------------------------------------------------------
stop() ->
cowboy:stop_listener(ar_http_iface_listener).
Delay = 60_000,
ok = ranch:suspend_listener(ar_http_iface_listener),
Pids = terminate_connections(Delay),
terminate_listener(Pids, Delay).

%%--------------------------------------------------------------------
%% @doc ensure all connections have been closed before stopping the
%% cowboy listener.
%% @end
%%--------------------------------------------------------------------
terminate_listener(Pids, Delay) ->
AfterDelay = Delay*3,
receive
{'DOWN', Ref, process, Pid, _} ->
Filter = fun({Pid, Ref}) -> false; (_) -> true end,
NewPids = lists:filter(Filter, Pids),
terminate_listener(NewPids, Delay);
_ -> terminate_listener(Pids, Delay)
after
AfterDelay ->
cowboy:stop_listener(ar_http_iface_listener)
end.

%%--------------------------------------------------------------------
%% @doc terminate all connections. A delay can be set to force close
%% or kill the remaining connections.
%% @end
%%--------------------------------------------------------------------
-spec terminate_connections(Delay) -> Return when
Delay :: pos_integer(),
Return :: [{pid(), reference()}, ...].

terminate_connections(Delay) ->
Processes = ranch:procs(ar_http_iface_listener, connections),
[ terminate_connection(P, Delay) || P <- Processes ].

%%--------------------------------------------------------------------
%% @doc terminate a ranch/cowboy connection. this follows the drain
%% procedure explaining in ranch and cowboy documentation.
%% @end
%%--------------------------------------------------------------------
-spec terminate_connection(Connection, Delay) -> Return when
Connection :: ranch:ref(),
Delay :: pos_integer(),
Return :: pid().

terminate_connection(Connection, Delay) ->
spawn_monitor(fun() ->
terminate_connection_init(Connection, Delay)
end).

%%--------------------------------------------------------------------
%% @hidden
%% @doc terminate connection init procedure.
%% @end
%%--------------------------------------------------------------------
-spec terminate_connection_init(Connection, Delay) -> Return when
Connection :: ranch:ref(),
Delay :: pos_integer(),
Return :: pid().

terminate_connection_init(Connection, Delay) ->
% let monitor the connection first. a ranch connection is a process
% used as "interface" to a socket. the procedure must know if the
% process is still alive.
_Ref = erlang:monitor(process, Connection),

% it seems ranch_tcp does not have a way to "kill" or "close" the
% connection from the process. The port needs to be extracted, to
% accomplish that, one can check using process_info/2 functions or
% extract the process state with sys:get_state/1.
{links, Links} = erlang:process_info(Connection, links),

% in normal situation, a ranch connection process must have only one
% port (socket). If it's not the case, this is abnormal.
[Socket|_] = [ P || P <- Links, is_port(P) ],

% let check if information about the sockets can be retrieved, in this
% case, the address/port is the information required. It also check if
% the socket is still active. If an exception is returned, we assume
% the socket is already down.
try inet:peername(Socket) of
{ok, {PeerAddress, PeerPort}} ->
% let prepare the state used for the termination loop procedure.
Peer = string:join([inet:ntoa(PeerAddress), integer_to_list(PeerPort)], ":"),
State = #{
socket => Socket,
peer => Peer,
delay => Delay,
connection => Connection
},

% now everything is ready, the socket is set in read-only mode. the
% remote peer will receive this information and will try to fetch
% the last piece of data from the buffer. It does not guarantee the
% socket will be closed, in particular if the connection is slow.
logger:warning(#{ connection => Peer, socket => Connection, reason => "shutdown", msg => "shutdown (read-only)"}),
ranch_tcp:shutdown(Socket, write),

% in this case, two timeouts are configured, one simply closing the
% socket to announce another time to the remote peer that the node is
% shutting down...
{ok, _} = timer:send_after(Delay, {timeout, close}),

% ... and a second delay. This time, the socket is killed. linger
% option is set to 0 on the port and we reclose the socket.
{ok, _} = timer:send_after(Delay*2, {timeout, kill}),

% let start the terminate loop to shutdown properly the connection
terminate_connection_loop(State);
_ ->
ok
catch
_:_ ->
ok
end.

%%--------------------------------------------------------------------
%% @hidden
%% @doc shutdown procedure loop, in charge of dealing with monitoring
%% messages and timeouts.
%% @end
%%--------------------------------------------------------------------
-spec terminate_connection_loop(State) -> Return when
Socket :: port(),
Connection :: pid(),
Peer :: string(),
Delay :: pos_integer(),
State :: #{
socket => Socket,
connection => Connection,
peer => Peer,
delay => Delay
},
Return :: ok.

terminate_connection_loop(State) ->
Socket = maps:get(socket, State),
Connection = maps:get(connection, State),
Peer = maps:get(peer, State),
Delay = maps:get(delay, State),
AfterDelay = Delay*3,
receive
{timeout, close} ->
% the first timeout simply close the socket a
% socket time.
logger:warning(#{ connection => Peer, socket => Connection, reason => "shutdown", msg => "closed (timeout)"}),
ranch_tcp:close(Socket),
terminate_connection_loop(State);

{timeout, kill} ->
% the second timeout set linger tcp parameter
% to 0 and close the connection (again). At
% this stage, the connection should be closed.
% and we should receive a message from
% monitoring.
logger:warning(#{ connection => Peer, socket => Connection, reason => "shutdown", msg => "killed (timeout)"}),
ranch_tcp:setopts(Socket, [{linger, {true, 0}}]),
ranch_tcp:close(Socket),
terminate_connection_loop(State);

{'DOWN', _Ref, process, Connection, _} ->
logger:notice(#{ connection => Peer, socket => Connection, reason => "shutdown", msg => "terminated"});
Msg ->
logger:warning("received: ~p", [Msg])
after
AfterDelay ->
logger:error(#{ connection => Peer, socket => Connection, reason => "shutdown", msg => "can't kill"})
end.

name_route([]) ->
"/";
Expand Down

0 comments on commit 82c4542

Please sign in to comment.