Skip to content

Commit

Permalink
Header Synchronization Improvement
Browse files Browse the repository at this point in the history
Added a way to know if a peer is up or down
in `ar_peers` by calling `ar_peers:connected_peer/1`
and `ar_peers:disconnected_peer/1` functions. To
know if a peer is activate, one can call
`ar_peers:is_connected_peer/1` function.

All connected peers are now being tagged with a
timestamp with one connection is successful. This
timestamp can be retrieved using
`ar_peers:get_connection_timestamp/1` function.

lifetime peers list did not change, but current
peers list is now only displaying the active
peers during the last 30 days by default.

"tags" are used as simple key in `ar_peers`
ets table, prefixed with the `ar_tags` atom. They
are saved in the `peers` file maintained with
`ar_storage` function.

Modified `ar_http_iface_client:get_block_shadow/2`
function to add more parameters, in particular
the value used to pick a peer from peer list.

Modified `ar_header_sync` module to only select
active peer when synchronizing headers.

Fixed a typo in `bin/console`.
  • Loading branch information
humaite committed Oct 9, 2024
1 parent a283bcc commit b479bfc
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 42 deletions.
3 changes: 3 additions & 0 deletions apps/arweave/include/ar_config.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@
%% The default number of chunks fetched from disk at a time during in-place repacking.
-define(DEFAULT_REPACK_BATCH_SIZE, 100).

%% default filtering value for the peer list (30days)
-define(CURRENT_PEERS_LIST_FILTER, 30*60*60*24).

%% @doc Startup options with default values.
-record(config, {
init = false,
Expand Down
3 changes: 2 additions & 1 deletion apps/arweave/src/ar.erl
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,8 @@ show_help() ->
{"pool_server_address", "The pool address"},
{"pool_worker_name", "(optional) The pool worker name. "
"Useful if you have multiple machines (or replicas) "
"and you want to monitor them separately on pool"}
"and you want to monitor them
separately on pool"}
]
),
erlang:halt().
Expand Down
4 changes: 2 additions & 2 deletions apps/arweave/src/ar_data_discovery.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_config.hrl").
-include_lib("arweave/include/ar_data_discovery.hrl").

-record(state, {
Expand Down Expand Up @@ -204,9 +205,8 @@ pick_peers(Peers, PeerLen, N) ->

collect_peers() ->
N = ?DATA_DISCOVERY_COLLECT_PEERS_COUNT,
%% rank peers by their current rating since we care about their recent throughput performance
%% rank peers by their current rating since we care out their recent throughput performance
collect_peers(lists:sublist(ar_peers:get_peers(current), N)).

collect_peers([Peer | Peers]) ->
gen_server:cast(?MODULE, {add_peer, Peer}),
collect_peers(Peers);
Expand Down
5 changes: 3 additions & 2 deletions apps/arweave/src/ar_header_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ check_fork(Height, H, TXRoot) ->
end.

download_block(H, H2, TXRoot) ->
Peers = ar_peers:get_peers(lifetime),
Peers = ar_peers:get_peers(current),
case ar_storage:read_block(H) of
unavailable ->
download_block(Peers, H, H2, TXRoot);
Expand All @@ -492,7 +492,8 @@ download_block(H, H2, TXRoot) ->

download_block(Peers, H, H2, TXRoot) ->
Fork_2_0 = ar_fork:height_2_0(),
case ar_http_iface_client:get_block_shadow(Peers, H) of
Opts = #{ rand_min => length(Peers) },
case ar_http_iface_client:get_block_shadow(Peers, H, Opts) of
unavailable ->
?LOG_WARNING([
{event, ar_header_sync_failed_to_download_block_header},
Expand Down
5 changes: 5 additions & 0 deletions apps/arweave/src/ar_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,12 @@ handle_info({gun_up, PID, _Protocol}, #state{ status_by_pid = StatusByPID } = St
[gen_server:reply(ReplyTo, {ok, PID}) || {ReplyTo, _} <- PendingRequests],
StatusByPID2 = maps:put(PID, {connected, MonitorRef, Peer}, StatusByPID),
prometheus_gauge:inc(outbound_connections),
ar_peers:connected_peer(Peer),
{noreply, State#state{ status_by_pid = StatusByPID2 }};
{connected, _MonitorRef, Peer} ->
?LOG_WARNING([{event, gun_up_pid_already_exists},
{peer, ar_util:format_peer(Peer)}]),
ar_peers:connected_peer(Peer),
{noreply, State}
end;

Expand Down Expand Up @@ -179,6 +181,7 @@ handle_info({gun_error, PID, Reason},
prometheus_gauge:dec(outbound_connections),
ok
end,
ar_peers:disconnected_peer(Peer),
gun:shutdown(PID),
?LOG_DEBUG([{event, connection_error}, {reason, io_lib:format("~p", [Reason])}]),
{noreply, State#state{ status_by_pid = StatusByPID2, pid_by_peer = PIDByPeer2 }}
Expand Down Expand Up @@ -208,6 +211,7 @@ handle_info({gun_down, PID, Protocol, Reason, _KilledStreams, _UnprocessedStream
prometheus_gauge:dec(outbound_connections),
ok
end,
ar_peers:disconnected_peer(Peer),
{noreply, State#state{ status_by_pid = StatusByPID2, pid_by_peer = PIDByPeer2 }}
end;

Expand All @@ -226,6 +230,7 @@ handle_info({'DOWN', _Ref, process, PID, Reason},
prometheus_gauge:dec(outbound_connections),
ok
end,
ar_peers:disconnected_peer(Peer),
{noreply, State#state{ status_by_pid = StatusByPID2, pid_by_peer = PIDByPeer2 }}
end;

Expand Down
65 changes: 43 additions & 22 deletions apps/arweave/src/ar_http_iface_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@

-module(ar_http_iface_client).

-export([send_block_json/3, send_block_binary/3, send_block_binary/4, send_tx_json/3,
send_tx_binary/3, send_block_announcement/2, get_block_shadow/2, get_block_shadow/3,
get_block/3, get_tx/2, get_txs/2, get_tx_from_remote_peer/2, get_tx_data/2,
get_wallet_list_chunk/2, get_wallet_list_chunk/3, get_wallet_list/2,
add_peer/1, get_info/1, get_info/2, get_peers/1, get_time/2, get_height/1,
get_block_index/3, get_sync_record/1, get_sync_record/3,
get_chunk_binary/3, get_mempool/1, get_sync_buckets/1,
get_recent_hash_list/1, get_recent_hash_list_diff/2, get_reward_history/3,
get_block_time_history/3,
push_nonce_limiter_update/3, get_vdf_update/1, get_vdf_session/1,
get_previous_vdf_session/1, get_cm_partition_table/1, cm_h1_send/2, cm_h2_send/2,
cm_publish_send/2, get_jobs/2, post_partial_solution/2,
get_pool_cm_jobs/2, post_pool_cm_jobs/2, post_cm_partition_table_to_pool/2]).
-export([send_block_json/3, send_block_binary/3, send_block_binary/4,
send_tx_json/3, send_tx_binary/3, send_block_announcement/2,
get_block/3, get_tx/2, get_txs/2, get_tx_from_remote_peer/2,
get_tx_data/2, get_wallet_list_chunk/2, get_wallet_list_chunk/3,
get_wallet_list/2, add_peer/1, get_info/1, get_info/2, get_peers/1,
get_time/2, get_height/1, get_block_index/3, get_sync_record/1,
get_sync_record/3, get_chunk_binary/3, get_mempool/1,
get_sync_buckets/1, get_recent_hash_list/1,
get_recent_hash_list_diff/2, get_reward_history/3,
get_block_time_history/3, push_nonce_limiter_update/3,
get_vdf_update/1, get_vdf_session/1, get_previous_vdf_session/1,
get_cm_partition_table/1, cm_h1_send/2, cm_h2_send/2,
cm_publish_send/2, get_jobs/2, post_partial_solution/2,
get_pool_cm_jobs/2, post_pool_cm_jobs/2,
post_cm_partition_table_to_pool/2]).
-export([get_block_shadow/2, get_block_shadow/3, get_block_shadow/4]).

-include_lib("arweave/include/ar.hrl").
-include_lib("arweave/include/ar_config.hrl").
Expand Down Expand Up @@ -122,20 +125,38 @@ get_block(Peer, H, TXIndices) ->
{B, Time, Size}
end.

%% @doc Retreive a block shadow by hash or height from one of the given peers.
get_block_shadow([], _ID) ->
unavailable;
%%--------------------------------------------------------------------
%% @doc get a block shadow using default parameter.
%% @end
%%--------------------------------------------------------------------
get_block_shadow(Peers, ID) ->
Peer = lists:nth(rand:uniform(min(5, length(Peers))), Peers),
case get_block_shadow(ID, Peer, binary) of
get_block_shadow(Peers, ID, #{}).

%%--------------------------------------------------------------------
%% @doc Retrieve a block shadow by hash or height from one of the given
%% peers. Some options can be modified like `rand_min',
%% `connect_timeout' and `timeout'.
%% @see get_block_shadow/4
%% @end
%%--------------------------------------------------------------------
get_block_shadow([], _ID, _Opts) ->
unavailable;
get_block_shadow(Peers, ID, Opts) ->
RandMin = maps:get(rand_min, Opts, 5),
Random = rand:uniform(min(RandMin, length(Peers))),
Peer = lists:nth(Random, Peers),
case get_block_shadow(ID, Peer, binary, Opts) of
not_found ->
get_block_shadow(Peers -- [Peer], ID);
get_block_shadow(Peers -- [Peer], ID, Opts);
{ok, B, Time, Size} ->
{Peer, B, Time, Size}
end.

%% @doc Retreive a block shadow by hash or height from the given peer.
get_block_shadow(ID, Peer, Encoding) ->
%%--------------------------------------------------------------------
%% @doc Retrieve a block shadow by hash or height from the given peer.
%% @end
%%--------------------------------------------------------------------
get_block_shadow(ID, Peer, Encoding, _Opts) ->
handle_block_response(Peer, Encoding,
ar_http:req(#{
method => get,
Expand Down Expand Up @@ -1102,7 +1123,7 @@ get_info(Peer) ->
timeout => 2 * 1000
})
of
{ok, {{<<"200">>, _}, _, JSON, _, _}} ->
{ok, {{<<"200">>, _}, _, JSON, _, _}} ->
case ar_serialize:json_decode(JSON, [return_maps]) of
{ok, JsonMap} ->
JsonMap;
Expand Down
2 changes: 1 addition & 1 deletion apps/arweave/src/ar_http_iface_middleware.erl
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ handle(<<"GET">>, [<<"peers">>], Req, _Pid) ->
[
list_to_binary(ar_util:format_peer(P))
||
P <- ar_peers:get_peers(lifetime),
P <- ar_peers:get_peers(current),
P /= ar_http_util:arweave_peer(Req),
ar_peers:is_public_peer(P)
]
Expand Down
Loading

0 comments on commit b479bfc

Please sign in to comment.