Skip to content

Commit ffc842d

Browse files
authored
Censorship resistance test (#531)
1 parent 496ff02 commit ffc842d

12 files changed

+256
-58
lines changed

Cargo.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

consensus/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "aleph-bft"
3-
version = "0.42.0"
3+
version = "0.42.1"
44
edition = "2021"
55
authors = ["Cardinal Cryptography"]
66
categories = ["algorithms", "data-structures", "cryptography", "database"]

consensus/src/testing/behind.rs

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
use std::{
2+
collections::{HashSet, VecDeque},
3+
time::{Duration, Instant},
4+
};
5+
6+
use crate::{
7+
testing::{init_log, spawn_honest_member, HonestMember, NetworkData},
8+
NodeCount, NodeIndex, SpawnHandle,
9+
};
10+
use aleph_bft_mock::{DataProvider, NetworkHook, Router, Spawner};
11+
use futures::StreamExt;
12+
use log::info;
13+
14+
struct Latency {
15+
who: NodeIndex,
16+
buffer: VecDeque<(Instant, (NetworkData, NodeIndex, NodeIndex))>,
17+
}
18+
19+
const LATENCY: Duration = Duration::from_millis(300);
20+
21+
impl Latency {
22+
pub fn new(who: NodeIndex) -> Self {
23+
Latency {
24+
who,
25+
buffer: VecDeque::new(),
26+
}
27+
}
28+
29+
fn add_message(
30+
&mut self,
31+
data: NetworkData,
32+
sender: NodeIndex,
33+
recipient: NodeIndex,
34+
) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
35+
match sender == self.who || recipient == self.who {
36+
true => {
37+
self.buffer
38+
.push_back((Instant::now(), (data, sender, recipient)));
39+
Vec::new()
40+
}
41+
false => vec![(data, sender, recipient)],
42+
}
43+
}
44+
45+
fn messages_to_send(&mut self) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
46+
let mut result = Vec::new();
47+
while !self.buffer.is_empty() {
48+
let (when, msg) = self
49+
.buffer
50+
.pop_front()
51+
.expect("just checked it is not empty");
52+
if Instant::now().duration_since(when) < LATENCY {
53+
self.buffer.push_front((when, msg));
54+
break;
55+
}
56+
result.push(msg);
57+
}
58+
result
59+
}
60+
}
61+
62+
impl NetworkHook<NetworkData> for Latency {
63+
fn process_message(
64+
&mut self,
65+
data: NetworkData,
66+
sender: NodeIndex,
67+
recipient: NodeIndex,
68+
) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
69+
let mut result = self.add_message(data, sender, recipient);
70+
result.append(&mut self.messages_to_send());
71+
result
72+
}
73+
}
74+
75+
#[tokio::test(flavor = "multi_thread")]
76+
async fn delayed_finalized() {
77+
let n_members = NodeCount(7);
78+
let australian = NodeIndex(0);
79+
init_log();
80+
let spawner = Spawner::new();
81+
let mut batch_rxs = Vec::new();
82+
let mut exits = Vec::new();
83+
let mut handles = Vec::new();
84+
let (mut net_hub, networks) = Router::new(n_members);
85+
86+
net_hub.add_hook(Latency::new(australian));
87+
88+
spawner.spawn("network-hub", net_hub);
89+
90+
for (network, _) in networks {
91+
let ix = network.index();
92+
let HonestMember {
93+
finalization_rx,
94+
exit_tx,
95+
handle,
96+
..
97+
} = spawn_honest_member(
98+
spawner,
99+
ix,
100+
n_members,
101+
vec![],
102+
DataProvider::new_range(ix.0 * 50, (ix.0 + 1) * 50),
103+
network,
104+
);
105+
batch_rxs.push(finalization_rx);
106+
exits.push(exit_tx);
107+
handles.push(handle);
108+
}
109+
let to_finalize: HashSet<u32> = (0..((n_members.0) * 50))
110+
.map(|number| number as u32)
111+
.collect();
112+
113+
for mut rx in batch_rxs.drain(..) {
114+
let mut to_finalize_local = to_finalize.clone();
115+
while !to_finalize_local.is_empty() {
116+
let number = rx.next().await.unwrap();
117+
info!("finalizing {}", number);
118+
assert!(to_finalize_local.remove(&number));
119+
}
120+
info!("finished one node");
121+
}
122+
123+
for exit in exits {
124+
let _ = exit.send(());
125+
}
126+
for handle in handles {
127+
let _ = handle.await;
128+
}
129+
}

consensus/src/testing/byzantine.rs

+15-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use crate::{
66
Hasher, Network as NetworkT, NetworkData as NetworkDataT, NodeCount, NodeIndex, NodeMap,
77
Recipient, Round, SessionId, Signed, SpawnHandle, TaskHandle,
88
};
9-
use aleph_bft_mock::{Data, Hash64, Hasher64, Keychain, NetworkHook, Router, Spawner};
9+
use aleph_bft_mock::{
10+
Data, DataProvider, Hash64, Hasher64, Keychain, NetworkHook, Router, Spawner,
11+
};
1012
use futures::{channel::oneshot, StreamExt};
1113
use log::{debug, error, trace};
1214
use parking_lot::Mutex;
@@ -230,7 +232,12 @@ impl AlertHook {
230232
}
231233

232234
impl NetworkHook<NetworkData> for AlertHook {
233-
fn update_state(&mut self, data: &mut NetworkData, sender: NodeIndex, recipient: NodeIndex) {
235+
fn process_message(
236+
&mut self,
237+
data: NetworkData,
238+
sender: NodeIndex,
239+
recipient: NodeIndex,
240+
) -> Vec<(NetworkData, NodeIndex, NodeIndex)> {
234241
use crate::{alerts::AlertMessage::*, network::NetworkDataInner::*};
235242
if let crate::NetworkData(Alert(ForkAlert(_))) = data {
236243
*self
@@ -239,21 +246,21 @@ impl NetworkHook<NetworkData> for AlertHook {
239246
.entry((sender, recipient))
240247
.or_insert(0) += 1;
241248
}
249+
vec![(data, sender, recipient)]
242250
}
243251
}
244252

245253
async fn honest_members_agree_on_batches_byzantine(
246254
n_members: NodeCount,
247255
n_honest: NodeCount,
248256
n_batches: usize,
249-
network_reliability: f64,
250257
) {
251258
init_log();
252259
let spawner = Spawner::new();
253260
let mut batch_rxs = Vec::new();
254261
let mut exits = Vec::new();
255262
let mut handles = Vec::new();
256-
let (mut net_hub, networks) = Router::new(n_members, network_reliability);
263+
let (mut net_hub, networks) = Router::new(n_members);
257264

258265
let alert_hook = AlertHook::new();
259266
net_hub.add_hook(alert_hook.clone());
@@ -270,7 +277,7 @@ async fn honest_members_agree_on_batches_byzantine(
270277
exit_tx,
271278
handle,
272279
..
273-
} = spawn_honest_member(spawner, ix, n_members, vec![], network);
280+
} = spawn_honest_member(spawner, ix, n_members, vec![], DataProvider::new(), network);
274281
batch_rxs.push(finalization_rx);
275282
(exit_tx, handle)
276283
};
@@ -317,17 +324,17 @@ async fn honest_members_agree_on_batches_byzantine(
317324
#[tokio::test(flavor = "multi_thread")]
318325
#[serial]
319326
async fn small_byzantine_one_forker() {
320-
honest_members_agree_on_batches_byzantine(4.into(), 3.into(), 5, 1.0).await;
327+
honest_members_agree_on_batches_byzantine(4.into(), 3.into(), 5).await;
321328
}
322329

323330
#[tokio::test(flavor = "multi_thread")]
324331
#[serial]
325332
async fn small_byzantine_two_forkers() {
326-
honest_members_agree_on_batches_byzantine(7.into(), 5.into(), 5, 1.0).await;
333+
honest_members_agree_on_batches_byzantine(7.into(), 5.into(), 5).await;
327334
}
328335

329336
#[tokio::test(flavor = "multi_thread")]
330337
#[serial]
331338
async fn medium_byzantine_ten_forkers() {
332-
honest_members_agree_on_batches_byzantine(31.into(), 21.into(), 5, 1.0).await;
339+
honest_members_agree_on_batches_byzantine(31.into(), 21.into(), 5).await;
333340
}

consensus/src/testing/crash.rs

+13-10
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,25 @@ use crate::{
22
testing::{init_log, spawn_honest_member, HonestMember},
33
NodeCount, SpawnHandle,
44
};
5-
use aleph_bft_mock::{Router, Spawner};
5+
use aleph_bft_mock::{DataProvider, Router, Spawner, UnreliableHook};
66
use futures::StreamExt;
77
use serial_test::serial;
88

99
async fn honest_members_agree_on_batches(
1010
n_members: NodeCount,
1111
n_alive: NodeCount,
1212
n_batches: usize,
13-
network_reliability: f64,
13+
network_reliability: Option<f64>,
1414
) {
1515
init_log();
1616
let spawner = Spawner::new();
1717
let mut exits = Vec::new();
1818
let mut handles = Vec::new();
1919
let mut batch_rxs = Vec::new();
20-
let (net_hub, networks) = Router::new(n_members, network_reliability);
20+
let (mut net_hub, networks) = Router::new(n_members);
21+
if let Some(reliability) = network_reliability {
22+
net_hub.add_hook(UnreliableHook::new(reliability));
23+
}
2124
spawner.spawn("network-hub", net_hub);
2225

2326
for (network, _) in networks {
@@ -28,7 +31,7 @@ async fn honest_members_agree_on_batches(
2831
exit_tx,
2932
handle,
3033
..
31-
} = spawn_honest_member(spawner, ix, n_members, vec![], network);
34+
} = spawn_honest_member(spawner, ix, n_members, vec![], DataProvider::new(), network);
3235
batch_rxs.push(finalization_rx);
3336
exits.push(exit_tx);
3437
handles.push(handle);
@@ -59,35 +62,35 @@ async fn honest_members_agree_on_batches(
5962
#[tokio::test(flavor = "multi_thread")]
6063
#[serial]
6164
async fn small_honest_all_alive() {
62-
honest_members_agree_on_batches(4.into(), 4.into(), 5, 1.0).await;
65+
honest_members_agree_on_batches(4.into(), 4.into(), 5, None).await;
6366
}
6467

6568
#[tokio::test(flavor = "multi_thread")]
6669
#[serial]
6770
async fn small_honest_one_crash() {
68-
honest_members_agree_on_batches(4.into(), 3.into(), 5, 1.0).await;
71+
honest_members_agree_on_batches(4.into(), 3.into(), 5, None).await;
6972
}
7073

7174
#[tokio::test(flavor = "multi_thread")]
7275
#[serial]
7376
async fn small_honest_one_crash_unreliable_network() {
74-
honest_members_agree_on_batches(4.into(), 3.into(), 5, 0.9).await;
77+
honest_members_agree_on_batches(4.into(), 3.into(), 5, Some(0.9)).await;
7578
}
7679

7780
#[tokio::test(flavor = "multi_thread")]
7881
#[serial]
7982
async fn medium_honest_all_alive() {
80-
honest_members_agree_on_batches(31.into(), 31.into(), 5, 1.0).await;
83+
honest_members_agree_on_batches(31.into(), 31.into(), 5, None).await;
8184
}
8285

8386
#[tokio::test(flavor = "multi_thread")]
8487
#[serial]
8588
async fn medium_honest_ten_crashes() {
86-
honest_members_agree_on_batches(31.into(), 21.into(), 5, 1.0).await;
89+
honest_members_agree_on_batches(31.into(), 21.into(), 5, None).await;
8790
}
8891

8992
#[tokio::test(flavor = "multi_thread")]
9093
#[serial]
9194
async fn medium_honest_ten_crashes_unreliable_network() {
92-
honest_members_agree_on_batches(31.into(), 21.into(), 5, 0.9).await;
95+
honest_members_agree_on_batches(31.into(), 21.into(), 5, Some(0.9)).await;
9396
}

consensus/src/testing/crash_recovery.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
units::{UncheckedSignedUnit, Unit, UnitCoord},
44
NodeCount, NodeIndex, SpawnHandle, TaskHandle,
55
};
6-
use aleph_bft_mock::{Data, Hasher64, Router, Signature, Spawner};
6+
use aleph_bft_mock::{Data, DataProvider, Hasher64, Router, Signature, Spawner};
77
use codec::Decode;
88
use futures::{
99
channel::{mpsc, oneshot},
@@ -67,7 +67,14 @@ fn connect_nodes(
6767
saved_state,
6868
exit_tx,
6969
handle,
70-
} = spawn_honest_member(*spawner, ix, n_members, vec![], network);
70+
} = spawn_honest_member(
71+
*spawner,
72+
ix,
73+
n_members,
74+
vec![],
75+
DataProvider::new(),
76+
network,
77+
);
7178
(
7279
ix,
7380
NodeData {
@@ -109,7 +116,14 @@ async fn reconnect_nodes(
109116
saved_state,
110117
exit_tx,
111118
handle,
112-
} = spawn_honest_member(*spawner, *node_id, n_members, saved_units.clone(), network);
119+
} = spawn_honest_member(
120+
*spawner,
121+
*node_id,
122+
n_members,
123+
saved_units.clone(),
124+
DataProvider::new(),
125+
network,
126+
);
113127
reconnected_nodes.push((
114128
*node_id,
115129
NodeData {
@@ -166,7 +180,7 @@ async fn crashed_nodes_recover(n_members: NodeCount, n_batches: usize) {
166180

167181
let n_kill = (n_members - n_members.consensus_threshold()) + 1.into();
168182
let spawner = Spawner::new();
169-
let (net_hub, networks) = Router::new(n_members, 1.0);
183+
let (net_hub, networks) = Router::new(n_members);
170184
spawner.spawn("network-hub", net_hub);
171185

172186
let mut node_data = connect_nodes(&spawner, n_members, networks);
@@ -239,7 +253,7 @@ async fn saves_units_properly() {
239253
let n_batches = 2;
240254
let n_members = NodeCount(4);
241255
let spawner = Spawner::new();
242-
let (net_hub, networks) = Router::new(n_members, 1.0);
256+
let (net_hub, networks) = Router::new(n_members);
243257
spawner.spawn("network-hub", net_hub);
244258

245259
let mut node_data = connect_nodes(&spawner, n_members, networks);

consensus/src/testing/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod alerts;
2+
mod behind;
23
mod byzantine;
34
mod crash;
45
mod crash_recovery;
@@ -67,9 +68,9 @@ pub fn spawn_honest_member(
6768
node_index: NodeIndex,
6869
n_members: NodeCount,
6970
units: Vec<u8>,
71+
data_provider: DataProvider,
7072
network: impl 'static + NetworkT<NetworkData>,
7173
) -> HonestMember {
72-
let data_provider = DataProvider::new();
7374
let (finalization_handler, finalization_rx) = FinalizationHandler::new();
7475
let config = gen_config(node_index, n_members, gen_delay_config());
7576
let (exit_tx, exit_rx) = oneshot::channel();

0 commit comments

Comments
 (0)