Skip to content

Commit 3159f53

Browse files
committed
raft: leader reponnds to learner read index message
Signed-off-by: nolouch <nolouch@gmail.com>
1 parent b19e830 commit 3159f53

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

src/raft.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -1691,12 +1691,19 @@ impl<T: Storage> Raft<T> {
16911691
}
16921692
}
16931693
}
1694-
} else {
1694+
} else if m.get_from() == INVALID_ID || m.get_from() == self.id {
16951695
let rs = ReadState {
16961696
index: self.raft_log.committed,
16971697
request_ctx: m.take_entries()[0].take_data(),
16981698
};
16991699
self.read_states.push(rs);
1700+
} else {
1701+
let mut to_send = Message::new();
1702+
to_send.set_to(m.get_from());
1703+
to_send.set_msg_type(MessageType::MsgReadIndexResp);
1704+
to_send.set_index(self.raft_log.committed);
1705+
to_send.set_entries(m.take_entries());
1706+
self.send(to_send);
17001707
}
17011708
return Ok(());
17021709
}

tests/integration_cases/test_raft.rs

+69
Original file line numberDiff line numberDiff line change
@@ -2189,6 +2189,75 @@ fn test_read_only_option_safe() {
21892189
}
21902190
}
21912191

2192+
#[test]
2193+
fn test_read_only_with_learner() {
2194+
setup_for_test();
2195+
let a = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage());
2196+
let b = new_test_learner_raft(2, vec![1], vec![2], 10, 1, new_storage());
2197+
2198+
let mut nt = Network::new(vec![Some(a), Some(b)]);
2199+
2200+
// we can not let system choose the value of randomizedElectionTimeout
2201+
// otherwise it will introduce some uncertainty into this test case
2202+
// we need to ensure randomizedElectionTimeout > electionTimeout here
2203+
let b_election_timeout = nt.peers[&2].get_election_timeout();
2204+
nt.peers
2205+
.get_mut(&2)
2206+
.unwrap()
2207+
.set_randomized_election_timeout(b_election_timeout + 1);
2208+
2209+
for _ in 0..b_election_timeout {
2210+
nt.peers.get_mut(&2).unwrap().tick();
2211+
}
2212+
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
2213+
2214+
assert_eq!(nt.peers[&1].state, StateRole::Leader);
2215+
assert_eq!(nt.peers[&2].state, StateRole::Follower);
2216+
2217+
let mut tests = vec![
2218+
(1, 10, 11, "ctx1"),
2219+
(2, 10, 21, "ctx2"),
2220+
(1, 10, 31, "ctx3"),
2221+
(2, 10, 41, "ctx4"),
2222+
];
2223+
2224+
for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() {
2225+
for _ in 0..proposals {
2226+
nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]);
2227+
}
2228+
2229+
let e = new_entry(0, 0, Some(wctx));
2230+
nt.send(vec![new_message_with_entries(
2231+
id,
2232+
id,
2233+
MessageType::MsgReadIndex,
2234+
vec![e],
2235+
)]);
2236+
2237+
let read_states: Vec<ReadState> = nt
2238+
.peers
2239+
.get_mut(&id)
2240+
.unwrap()
2241+
.read_states
2242+
.drain(..)
2243+
.collect();
2244+
if read_states.is_empty() {
2245+
panic!("#{}: read_states is empty, want non-empty", i);
2246+
}
2247+
let rs = &read_states[0];
2248+
if rs.index != wri {
2249+
panic!("#{}: read_index = {}, want {}", i, rs.index, wri)
2250+
}
2251+
let vec_wctx = wctx.as_bytes().to_vec();
2252+
if rs.request_ctx != vec_wctx {
2253+
panic!(
2254+
"#{}: request_ctx = {:?}, want {:?}",
2255+
i, rs.request_ctx, vec_wctx
2256+
)
2257+
}
2258+
}
2259+
}
2260+
21922261
#[test]
21932262
fn test_read_only_option_lease() {
21942263
setup_for_test();

0 commit comments

Comments
 (0)