@@ -22,7 +22,7 @@ pub struct GossipService<T> {
22
22
/// Peer sampling service
23
23
peer_sampling_service : Arc < Mutex < PeerSamplingService > > ,
24
24
/// Configuration for gossip
25
- gossip_config : GossipConfig ,
25
+ gossip_config : Arc < GossipConfig > ,
26
26
/// Shutdown requested flag
27
27
shutdown : Arc < AtomicBool > ,
28
28
/// Thread handles
@@ -48,7 +48,7 @@ where T: UpdateHandler + 'static + Send
48
48
address,
49
49
peer_sampling_service : Arc :: new ( Mutex :: new ( PeerSamplingService :: new ( address, peer_sampling_config) ) ) ,
50
50
updates : Arc :: new ( RwLock :: new ( UpdateDecorator :: new ( gossip_config. update_expiration ( ) . clone ( ) ) ) ) ,
51
- gossip_config,
51
+ gossip_config : Arc :: new ( gossip_config ) ,
52
52
shutdown : Arc :: new ( AtomicBool :: new ( false ) ) ,
53
53
activities : Vec :: new ( ) ,
54
54
update_handler : Arc :: new ( Mutex :: new ( None ) ) ,
@@ -107,8 +107,7 @@ where T: UpdateHandler + 'static + Send
107
107
}
108
108
109
109
fn start_message_header_handler ( & mut self , receiver : Receiver < HeaderMessage > ) -> Result < ( ) , Box < dyn Error > > {
110
- let push = self . gossip_config . is_push ( ) ;
111
- let pull = self . gossip_config . is_pull ( ) ;
110
+ let gossip_config_arc = Arc :: clone ( & self . gossip_config ) ;
112
111
let address = self . address . to_string ( ) ;
113
112
let updates_arc = Arc :: clone ( & self . updates ) ;
114
113
let handle = std:: thread:: Builder :: new ( ) . name ( format ! ( "{} - header receiver" , address) ) . spawn ( move || {
@@ -120,7 +119,7 @@ where T: UpdateHandler + 'static + Send
120
119
let updates = updates_arc. read ( ) . unwrap ( ) ;
121
120
122
121
// Response with message headers if pull is enabled
123
- if pull && updates. active_count ( ) > 0 && * message. message_type ( ) == MessageType :: Request {
122
+ if gossip_config_arc . is_pull ( ) && updates. active_count ( ) > 0 && * message. message_type ( ) == MessageType :: Request {
124
123
let mut response = HeaderMessage :: new_response ( address. clone ( ) ) ;
125
124
response. set_headers ( updates. active_headers ( ) ) ;
126
125
match crate :: network:: send ( & sender_address, Box :: new ( response) ) {
@@ -130,7 +129,7 @@ where T: UpdateHandler + 'static + Send
130
129
}
131
130
132
131
// Process message if (request and push enabled) or (response and pull enabled)
133
- if * message. message_type ( ) == MessageType :: Request && push || * message. message_type ( ) == MessageType :: Response && pull {
132
+ if * message. message_type ( ) == MessageType :: Request && gossip_config_arc . is_push ( ) || * message. message_type ( ) == MessageType :: Response && gossip_config_arc . is_pull ( ) {
134
133
135
134
let mut new_digests = HashMap :: new ( ) ;
136
135
message. headers ( ) . iter ( ) . for_each ( |digest| {
@@ -233,11 +232,9 @@ where T: UpdateHandler + 'static + Send
233
232
}
234
233
235
234
fn start_gossip_activity ( & mut self ) -> Result < ( ) , Box < dyn Error > > {
236
- let push = self . gossip_config . is_push ( ) ;
235
+ let gossip_config_arc = Arc :: clone ( & self . gossip_config ) ;
237
236
let node_address = self . address . to_string ( ) ;
238
237
let shutdown_requested = Arc :: clone ( & self . shutdown ) ;
239
- let gossip_period = self . gossip_config . gossip_period ( ) ;
240
- let gossip_deviation = self . gossip_config . gossip_deviation ( ) ;
241
238
let peer_sampling_arc = Arc :: clone ( & self . peer_sampling_service ) ;
242
239
let updates_arc = Arc :: clone ( & self . updates ) ;
243
240
let handle = std:: thread:: Builder :: new ( ) . name ( format ! ( "{} - gossip activity" , self . address( ) . to_string( ) ) ) . spawn ( move ||{
@@ -248,17 +245,17 @@ where T: UpdateHandler + 'static + Send
248
245
}
249
246
250
247
let deviation =
251
- if gossip_deviation == 0 { 0 }
252
- else { rand:: thread_rng ( ) . gen_range ( 0 , gossip_deviation) } ;
253
- let sleep = gossip_period + deviation;
248
+ if gossip_config_arc . gossip_deviation ( ) == 0 { 0 }
249
+ else { rand:: thread_rng ( ) . gen_range ( 0 , gossip_config_arc . gossip_deviation ( ) ) } ;
250
+ let sleep = gossip_config_arc . gossip_period ( ) + deviation;
254
251
std:: thread:: sleep ( std:: time:: Duration :: from_millis ( sleep) ) ;
255
252
256
253
let mut peer_sampling_service = peer_sampling_arc. lock ( ) . unwrap ( ) ;
257
254
if let Some ( peer) = peer_sampling_service. get_peer ( ) {
258
255
if let Ok ( peer_address) = peer. address ( ) . parse :: < SocketAddr > ( ) {
259
256
drop ( peer_sampling_service) ;
260
257
let mut message = HeaderMessage :: new_request ( node_address. to_string ( ) ) ;
261
- if push {
258
+ if gossip_config_arc . is_push ( ) {
262
259
// send active headers
263
260
let mut updates = updates_arc. write ( ) . unwrap ( ) ;
264
261
0 commit comments