From 3159f5380eb03651ba44932e0df43fd47e47a91a Mon Sep 17 00:00:00 2001 From: nolouch Date: Thu, 18 Apr 2019 17:25:44 +0800 Subject: [PATCH 1/3] raft: leader reponnds to learner read index message Signed-off-by: nolouch --- src/raft.rs | 9 +++- tests/integration_cases/test_raft.rs | 69 ++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/src/raft.rs b/src/raft.rs index c810d6f4a..c57c01884 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -1691,12 +1691,19 @@ impl Raft { } } } - } else { + } else if m.get_from() == INVALID_ID || m.get_from() == self.id { let rs = ReadState { index: self.raft_log.committed, request_ctx: m.take_entries()[0].take_data(), }; self.read_states.push(rs); + } else { + let mut to_send = Message::new(); + to_send.set_to(m.get_from()); + to_send.set_msg_type(MessageType::MsgReadIndexResp); + to_send.set_index(self.raft_log.committed); + to_send.set_entries(m.take_entries()); + self.send(to_send); } return Ok(()); } diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index e30d99975..61cf2624f 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -2189,6 +2189,75 @@ fn test_read_only_option_safe() { } } +#[test] +fn test_read_only_with_learner() { + setup_for_test(); + let a = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage()); + let b = new_test_learner_raft(2, vec![1], vec![2], 10, 1, new_storage()); + + let mut nt = Network::new(vec![Some(a), Some(b)]); + + // we can not let system choose the value of randomizedElectionTimeout + // otherwise it will introduce some uncertainty into this test case + // we need to ensure randomizedElectionTimeout > electionTimeout here + let b_election_timeout = nt.peers[&2].get_election_timeout(); + nt.peers + .get_mut(&2) + .unwrap() + .set_randomized_election_timeout(b_election_timeout + 1); + + for _ in 0..b_election_timeout { + nt.peers.get_mut(&2).unwrap().tick(); + } + nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + + assert_eq!(nt.peers[&1].state, StateRole::Leader); + assert_eq!(nt.peers[&2].state, StateRole::Follower); + + let mut tests = vec![ + (1, 10, 11, "ctx1"), + (2, 10, 21, "ctx2"), + (1, 10, 31, "ctx3"), + (2, 10, 41, "ctx4"), + ]; + + for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() { + for _ in 0..proposals { + nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); + } + + let e = new_entry(0, 0, Some(wctx)); + nt.send(vec![new_message_with_entries( + id, + id, + MessageType::MsgReadIndex, + vec![e], + )]); + + let read_states: Vec = nt + .peers + .get_mut(&id) + .unwrap() + .read_states + .drain(..) + .collect(); + if read_states.is_empty() { + panic!("#{}: read_states is empty, want non-empty", i); + } + let rs = &read_states[0]; + if rs.index != wri { + panic!("#{}: read_index = {}, want {}", i, rs.index, wri) + } + let vec_wctx = wctx.as_bytes().to_vec(); + if rs.request_ctx != vec_wctx { + panic!( + "#{}: request_ctx = {:?}, want {:?}", + i, rs.request_ctx, vec_wctx + ) + } + } +} + #[test] fn test_read_only_option_lease() { setup_for_test(); From c24275dc7648cf7d976497c08e41c8d08c694ecd Mon Sep 17 00:00:00 2001 From: nolouch Date: Fri, 19 Apr 2019 16:39:41 +0800 Subject: [PATCH 2/3] fix ci Signed-off-by: nolouch --- src/raft.rs | 2 +- tests/integration_cases/test_raft.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index c57c01884..2b095186b 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -1698,7 +1698,7 @@ impl Raft { }; self.read_states.push(rs); } else { - let mut to_send = Message::new(); + let mut to_send = Message::default(); to_send.set_to(m.get_from()); to_send.set_msg_type(MessageType::MsgReadIndexResp); to_send.set_index(self.raft_log.committed); diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index 61cf2624f..4af3162f1 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -2215,10 +2215,10 @@ fn test_read_only_with_learner() { assert_eq!(nt.peers[&2].state, StateRole::Follower); let mut tests = vec![ - (1, 10, 11, "ctx1"), - (2, 10, 21, "ctx2"), - (1, 10, 31, "ctx3"), - (2, 10, 41, "ctx4"), + (1, 10, 12, "ctx1"), + (2, 10, 22, "ctx2"), + (1, 10, 32, "ctx3"), + (2, 10, 42, "ctx4"), ]; for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() { From fbc0a9eaaf8a1e40fb00ef29a0e810e145111d0b Mon Sep 17 00:00:00 2001 From: nolouch Date: Wed, 24 Apr 2019 15:21:19 +0800 Subject: [PATCH 3/3] address comments Signed-off-by: nolouch --- src/raft.rs | 29 ++++++++++++++++------------ tests/integration_cases/test_raft.rs | 28 +++++++++++++++------------ 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 2b095186b..e70d6d086 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -1691,19 +1691,24 @@ impl Raft { } } } - } else if m.get_from() == INVALID_ID || m.get_from() == self.id { - let rs = ReadState { - index: self.raft_log.committed, - request_ctx: m.take_entries()[0].take_data(), - }; - self.read_states.push(rs); } else { - let mut to_send = Message::default(); - to_send.set_to(m.get_from()); - to_send.set_msg_type(MessageType::MsgReadIndexResp); - to_send.set_index(self.raft_log.committed); - to_send.set_entries(m.take_entries()); - self.send(to_send); + // there is only one voting member (the leader) in the cluster + if m.get_from() == INVALID_ID || m.get_from() == self.id { + // from leader itself + let rs = ReadState { + index: self.raft_log.committed, + request_ctx: m.take_entries()[0].take_data(), + }; + self.read_states.push(rs); + } else { + // from learner member + let mut to_send = Message::default(); + to_send.set_to(m.get_from()); + to_send.set_msg_type(MessageType::MsgReadIndexResp); + to_send.set_index(self.raft_log.committed); + to_send.set_entries(m.take_entries()); + self.send(to_send); + } } return Ok(()); } diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index 4af3162f1..fdabaed43 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -2241,20 +2241,24 @@ fn test_read_only_with_learner() { .read_states .drain(..) .collect(); - if read_states.is_empty() { - panic!("#{}: read_states is empty, want non-empty", i); - } + assert_eq!( + read_states.is_empty(), + false, + "#{}: read_states is empty, want non-empty", + i + ); let rs = &read_states[0]; - if rs.index != wri { - panic!("#{}: read_index = {}, want {}", i, rs.index, wri) - } + assert_eq!( + rs.index, wri, + "#{}: read_index = {}, want {}", + i, rs.index, wri + ); let vec_wctx = wctx.as_bytes().to_vec(); - if rs.request_ctx != vec_wctx { - panic!( - "#{}: request_ctx = {:?}, want {:?}", - i, rs.request_ctx, vec_wctx - ) - } + assert_eq!( + rs.request_ctx, vec_wctx, + "#{}: request_ctx = {:?}, want {:?}", + i, rs.request_ctx, vec_wctx + ); } }