From dbac91d5dc26fd92a8ac5038220176823d9d6d29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 4 Apr 2023 09:37:41 +0800 Subject: [PATCH] Improve: send AppendEntries response before committing entries When a follower receives an append-entries request that includes a series of log entries to append and the log id that the leader has committed, it responds with an append-entries response after committing and applying the entries. However, this is not strictly necessary. The follower could simply send the response as soon as the log entries have been appended and flushed to disk, without waiting for them to be committed. --- openraft/src/core/raft_core.rs | 8 +- openraft/src/display_ext.rs | 6 +- openraft/src/engine/engine_impl.rs | 58 +++++++++--- .../following_handler/append_entries_test.rs | 26 ++---- .../following_handler/commit_entries_test.rs | 3 +- .../engine/handler/following_handler/mod.rs | 53 ++++++----- openraft/src/engine/mod.rs | 2 +- ...ies_req_test.rs => append_entries_test.rs} | 92 +++++++++---------- openraft/src/error.rs | 35 +++++++ 9 files changed, 169 insertions(+), 114 deletions(-) rename openraft/src/engine/tests/{handle_append_entries_req_test.rs => append_entries_test.rs} (76%) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 32b542fc5..a414c3836 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -948,11 +948,11 @@ impl, S: RaftStorage> RaftCore` and slice `&[T]`. +//! Implement [`fmt::Display`] for types such as `Option` and slice `&[T]`. use std::fmt; @@ -9,7 +9,7 @@ use std::fmt; pub(crate) struct DisplayOption<'a, T: fmt::Display>(pub &'a Option); impl<'a, T: fmt::Display> fmt::Display for DisplayOption<'a, T> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match &self.0 { None => { write!(f, "None") @@ -26,7 +26,7 @@ impl<'a, T: fmt::Display> fmt::Display for DisplayOption<'a, T> { pub(crate) struct DisplaySlice<'a, T: fmt::Display, const MAX: usize = 5>(pub &'a [T]); impl<'a, T: fmt::Display, const MAX: usize> fmt::Display for DisplaySlice<'a, T, MAX> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let slice = self.0; let len = slice.len(); diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index a31fe88ef..678f7f35a 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -17,15 +17,18 @@ use crate::engine::time_state; use crate::engine::time_state::TimeState; use crate::engine::Command; use crate::engine::EngineOutput; +use crate::engine::SendResult; use crate::entry::RaftEntry; use crate::error::ForwardToLeader; use crate::error::InitializeError; use crate::error::NotAllowed; use crate::error::NotInMembers; +use crate::error::RejectAppendEntries; use crate::internal_server_state::InternalServerState; use crate::membership::EffectiveMembership; use crate::node::Node; use crate::raft::AppendEntriesResponse; +use crate::raft::AppendEntriesTx; use crate::raft::RaftRespTx; use crate::raft::VoteRequest; use crate::raft::VoteResponse; @@ -363,36 +366,65 @@ where /// /// Also clean conflicting entries and update membership state. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn handle_append_entries_req( + pub(crate) fn handle_append_entries( &mut self, vote: &Vote, prev_log_id: Option>, entries: Vec, - leader_committed: Option>, - ) -> AppendEntriesResponse { + tx: Option>, + ) -> bool { tracing::debug!( vote = display(vote), prev_log_id = display(prev_log_id.summary()), entries = display(DisplaySlice::<_>(&entries)), - leader_committed = display(leader_committed.summary()), - "append-entries request" - ); - tracing::debug!( my_vote = display(self.state.vote_ref()), my_last_log_id = display(self.state.last_log_id().summary()), - my_committed = display(self.state.committed().summary()), - "local state" + "{}", + func_name!() ); - let res = self.vote_handler().handle_message_vote(vote); - if let Err(rejected) = res { - return rejected.into(); + let res = self.append_entries(vote, prev_log_id, entries); + let is_ok = res.is_ok(); + + if let Some(tx) = tx { + let resp: AppendEntriesResponse = res.into(); + self.output.push_command(Command::SendAppendEntriesResult { + send: SendResult::new(Ok(resp), tx), + }); } + is_ok + } + + pub(crate) fn append_entries( + &mut self, + vote: &Vote, + prev_log_id: Option>, + entries: Vec, + ) -> Result<(), RejectAppendEntries> { + self.vote_handler().handle_message_vote(vote)?; // Vote is legal. let mut fh = self.following_handler(); - fh.append_entries(prev_log_id, entries, leader_committed) + fh.ensure_log_consecutive(prev_log_id)?; + fh.append_entries(prev_log_id, entries); + + Ok(()) + } + + /// Commit entries for follower/learner. + #[tracing::instrument(level = "debug", skip_all)] + pub(crate) fn handle_commit_entries(&mut self, leader_committed: Option>) { + tracing::debug!( + leader_committed = display(leader_committed.summary()), + my_accepted = display(self.state.accepted().summary()), + my_committed = display(self.state.committed().summary()), + "{}", + func_name!() + ); + + let mut fh = self.following_handler(); + fh.commit_entries(leader_committed); } /// Leader steps down(convert to learner) once the membership not containing it is committed. diff --git a/openraft/src/engine/handler/following_handler/append_entries_test.rs b/openraft/src/engine/handler/following_handler/append_entries_test.rs index 8d800a546..88d5a5fd6 100644 --- a/openraft/src/engine/handler/following_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/following_handler/append_entries_test.rs @@ -41,15 +41,11 @@ fn eng() -> Engine::Entry> { fn test_follower_append_entries_update_accepted() -> anyhow::Result<()> { let mut eng = eng(); - eng.following_handler().append_entries( - Some(log_id(2, 3)), - vec![ - // - blank_ent(3, 4), - blank_ent(3, 5), - ], - None, - ); + eng.following_handler().append_entries(Some(log_id(2, 3)), vec![ + // + blank_ent(3, 4), + blank_ent(3, 5), + ]); assert_eq!( &[ @@ -64,14 +60,10 @@ fn test_follower_append_entries_update_accepted() -> anyhow::Result<()> { // Update again, accept should not decrease. - eng.following_handler().append_entries( - Some(log_id(2, 3)), - vec![ - // - blank_ent(3, 4), - ], - None, - ); + eng.following_handler().append_entries(Some(log_id(2, 3)), vec![ + // + blank_ent(3, 4), + ]); assert_eq!(Some(&log_id(3, 5)), eng.state.last_log_id()); assert_eq!(Some(&log_id(3, 5)), eng.state.accepted()); diff --git a/openraft/src/engine/handler/following_handler/commit_entries_test.rs b/openraft/src/engine/handler/following_handler/commit_entries_test.rs index 9ebc39588..dab6eb43e 100644 --- a/openraft/src/engine/handler/following_handler/commit_entries_test.rs +++ b/openraft/src/engine/handler/following_handler/commit_entries_test.rs @@ -97,10 +97,11 @@ fn test_following_handler_commit_entries_le_accepted() -> anyhow::Result<()> { ); assert_eq!( vec![ + // Command::FollowerCommit { already_committed: Some(log_id(1, 1)), upto: log_id(2, 3) - }, // + }, ], eng.output.take_commands() ); diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 4229548ba..f9b5734d0 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -9,7 +9,7 @@ use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; use crate::entry::RaftEntry; -use crate::raft::AppendEntriesResponse; +use crate::error::RejectAppendEntries; use crate::raft_state::LogStateReader; use crate::EffectiveMembership; use crate::LogId; @@ -52,16 +52,10 @@ where /// /// Also clean conflicting entries and update membership state. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn append_entries( - &mut self, - prev_log_id: Option>, - entries: Vec, - leader_committed: Option>, - ) -> AppendEntriesResponse { + pub(crate) fn append_entries(&mut self, prev_log_id: Option>, entries: Vec) { tracing::debug!( prev_log_id = display(prev_log_id.summary()), entries = display(DisplaySlice::<_>(&entries)), - leader_committed = display(leader_committed.summary()), "append-entries request" ); tracing::debug!( @@ -74,18 +68,6 @@ where debug_assert!(x.get_log_id().index == prev_log_id.next_index()); } - if let Some(ref prev) = prev_log_id { - if !self.state.has_log_id(prev) { - let local = self.state.get_log_id(prev.index); - tracing::debug!(local = display(local.summary()), "prev_log_id does not match"); - - self.truncate_logs(prev.index); - return AppendEntriesResponse::Conflict; - } - } - - // else `prev_log_id.is_none()` means replicating logs from the very beginning. - tracing::debug!( committed = display(self.state.committed().summary()), entries = display(DisplaySlice::<_>(&entries)), @@ -107,10 +89,28 @@ where } self.do_append_entries(entries, since); + } - self.commit_entries(leader_committed); + /// Ensures the log to replicate is consecutive to the local log. + /// + /// If not, truncate the local log and return an error. + pub(crate) fn ensure_log_consecutive( + &mut self, + prev_log_id: Option>, + ) -> Result<(), RejectAppendEntries> { + if let Some(ref prev) = prev_log_id { + if !self.state.has_log_id(prev) { + let local = self.state.get_log_id(prev.index); + tracing::debug!(local = display(DisplayOption(&local)), "prev_log_id does not match"); + + self.truncate_logs(prev.index); + return Err(RejectAppendEntries::ByConflictingLogId { local, expect: *prev }); + } + } + + // else `prev_log_id.is_none()` means replicating logs from the very beginning. - AppendEntriesResponse::Success + Ok(()) } /// Follower/Learner appends `entries[since..]`. @@ -143,8 +143,9 @@ where self.output.push_command(Command::AppendInputEntries { entries }); } + /// Commit entries that are already committed by the leader. #[tracing::instrument(level = "debug", skip_all)] - fn commit_entries(&mut self, leader_committed: Option>) { + pub(crate) fn commit_entries(&mut self, leader_committed: Option>) { let accepted = self.state.accepted().copied(); let committed = std::cmp::min(accepted, leader_committed); @@ -232,7 +233,9 @@ where let m = Arc::new(membership); - // TODO: if effective membership changes, call `update_repliation()` + // TODO: if effective membership changes, call `update_replication()`, if a follower has replication + // streams. Now we don't have replication streams for follower, so it's ok to not call + // `update_replication()`. let effective_changed = self.state.membership_state.update_committed(m); if let Some(c) = effective_changed { self.output.push_command(Command::UpdateMembership { membership: c }) @@ -247,6 +250,7 @@ where // There are two special cases in which snapshot last log id does not exists locally: // Snapshot last log id before the local last-purged-log-id, or after the local last-log-id: // + // ``` // snapshot ----. // v // -----------------------llllllllll---> @@ -254,6 +258,7 @@ where // snapshot ----. // v // ----lllllllllll---------------------> + // ``` // // In the first case, snapshot-last-log-id <= last-purged-log-id <= // local-snapshot-last-log-id. Thus snapshot is obsolete and won't be installed. diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index 12ebef788..1896cf6f1 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -38,9 +38,9 @@ pub(crate) mod time_state; #[cfg(test)] mod tests { + mod append_entries_test; mod command_test; mod elect_test; - mod handle_append_entries_req_test; mod handle_vote_req_test; mod handle_vote_resp_test; mod initialize_test; diff --git a/openraft/src/engine/tests/handle_append_entries_req_test.rs b/openraft/src/engine/tests/append_entries_test.rs similarity index 76% rename from openraft/src/engine/tests/handle_append_entries_req_test.rs rename to openraft/src/engine/tests/append_entries_test.rs index 18100a2fb..1f6d22343 100644 --- a/openraft/src/engine/tests/handle_append_entries_req_test.rs +++ b/openraft/src/engine/tests/append_entries_test.rs @@ -11,7 +11,7 @@ use crate::engine::CEngine; use crate::engine::Command; use crate::engine::Engine; use crate::entry::RaftEntry; -use crate::raft::AppendEntriesResponse; +use crate::error::RejectAppendEntries; use crate::raft_state::LogStateReader; use crate::testing::log_id; use crate::utime::UTime; @@ -51,12 +51,12 @@ fn eng() -> CEngine { } #[test] -fn test_handle_append_entries_req_vote_is_rejected() -> anyhow::Result<()> { +fn test_append_entries_vote_is_rejected() -> anyhow::Result<()> { let mut eng = eng(); - let resp = eng.handle_append_entries_req(&Vote::new(1, 1), None, Vec::>::new(), None); + let res = eng.append_entries(&Vote::new(1, 1), None, Vec::>::new()); - assert_eq!(AppendEntriesResponse::HigherVote(Vote::new(2, 1)), resp); + assert_eq!(Err(RejectAppendEntries::ByVote(Vote::new(2, 1))), res); assert_eq!( &[ log_id(1, 1), // @@ -66,7 +66,6 @@ fn test_handle_append_entries_req_vote_is_rejected() -> anyhow::Result<()> { ); assert_eq!(Vote::new(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(2, 3)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(0, 0)), eng.state.committed()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), @@ -81,20 +80,19 @@ fn test_handle_append_entries_req_vote_is_rejected() -> anyhow::Result<()> { } #[test] -fn test_handle_append_entries_req_prev_log_id_is_applied() -> anyhow::Result<()> { +fn test_append_entries_prev_log_id_is_applied() -> anyhow::Result<()> { // An applied log id has to be committed thus let mut eng = eng(); eng.state.vote = UTime::new(Instant::now(), Vote::new(1, 2)); eng.vote_handler().become_leading(); - let resp = eng.handle_append_entries_req( + let res = eng.append_entries( &Vote::new_committed(2, 1), Some(log_id(0, 0)), Vec::>::new(), - None, ); - assert_eq!(AppendEntriesResponse::Success, resp); + assert_eq!(Ok(()), res); assert_eq!( &[ log_id(1, 1), // @@ -104,7 +102,6 @@ fn test_handle_append_entries_req_prev_log_id_is_applied() -> anyhow::Result<()> ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(2, 3)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(0, 0)), eng.state.committed()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), @@ -124,17 +121,22 @@ fn test_handle_append_entries_req_prev_log_id_is_applied() -> anyhow::Result<()> } #[test] -fn test_handle_append_entries_req_prev_log_id_conflict() -> anyhow::Result<()> { +fn test_append_entries_prev_log_id_conflict() -> anyhow::Result<()> { let mut eng = eng(); - let resp = eng.handle_append_entries_req( + let res = eng.append_entries( &Vote::new_committed(2, 1), Some(log_id(2, 2)), Vec::>::new(), - None, ); - assert_eq!(AppendEntriesResponse::Conflict, resp); + assert_eq!( + Err(RejectAppendEntries::ByConflictingLogId { + expect: log_id(2, 2), + local: Some(log_id(1, 2)), + }), + res + ); assert_eq!( &[ log_id(1, 1), // @@ -143,7 +145,6 @@ fn test_handle_append_entries_req_prev_log_id_conflict() -> anyhow::Result<()> { ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(1, 1)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(0, 0)), eng.state.committed()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), @@ -169,17 +170,15 @@ fn test_handle_append_entries_req_prev_log_id_conflict() -> anyhow::Result<()> { } #[test] -fn test_handle_append_entries_req_prev_log_id_is_committed() -> anyhow::Result<()> { +fn test_append_entries_prev_log_id_is_committed() -> anyhow::Result<()> { let mut eng = eng(); - let resp = eng.handle_append_entries_req( - &Vote::new_committed(2, 1), - Some(log_id(0, 0)), - vec![blank_ent(1, 1), blank_ent(2, 2)], - Some(log_id(1, 1)), - ); + let res = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(0, 0)), vec![ + blank_ent(1, 1), + blank_ent(2, 2), + ]); - assert_eq!(AppendEntriesResponse::Success, resp); + assert_eq!(Ok(()), res); assert_eq!( &[ log_id(1, 1), // @@ -189,7 +188,6 @@ fn test_handle_append_entries_req_prev_log_id_is_committed() -> anyhow::Result<( ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(2, 2)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(1, 1)), eng.state.committed()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), @@ -210,10 +208,6 @@ fn test_handle_append_entries_req_prev_log_id_is_committed() -> anyhow::Result<( Command::AppendInputEntries { entries: vec![blank_ent(2, 2)] }, - Command::FollowerCommit { - already_committed: Some(log_id(0, 0)), - upto: log_id(1, 1) - }, ], eng.output.take_commands() ); @@ -222,19 +216,23 @@ fn test_handle_append_entries_req_prev_log_id_is_committed() -> anyhow::Result<( } #[test] -fn test_handle_append_entries_req_prev_log_id_not_exists() -> anyhow::Result<()> { +fn test_append_entries_prev_log_id_not_exists() -> anyhow::Result<()> { let mut eng = eng(); eng.state.vote = UTime::new(Instant::now(), Vote::new(1, 2)); eng.vote_handler().become_leading(); - let resp = eng.handle_append_entries_req( - &Vote::new_committed(2, 1), - Some(log_id(2, 4)), - vec![blank_ent(2, 5), blank_ent(2, 6)], - Some(log_id(1, 1)), - ); + let res = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(2, 4)), vec![ + blank_ent(2, 5), + blank_ent(2, 6), + ]); - assert_eq!(AppendEntriesResponse::Conflict, resp); + assert_eq!( + Err(RejectAppendEntries::ByConflictingLogId { + expect: log_id(2, 4), + local: None, + }), + res + ); assert_eq!( &[ log_id(1, 1), // @@ -244,7 +242,6 @@ fn test_handle_append_entries_req_prev_log_id_not_exists() -> anyhow::Result<()> ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(2, 3)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(0, 0)), eng.state.committed()); assert_eq!( MembershipState::new( Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), @@ -264,7 +261,7 @@ fn test_handle_append_entries_req_prev_log_id_not_exists() -> anyhow::Result<()> } #[test] -fn test_handle_append_entries_req_entries_conflict() -> anyhow::Result<()> { +fn test_append_entries_conflict() -> anyhow::Result<()> { // prev_log_id matches, // The second entry in entries conflict. // This request will replace the effective membership. @@ -272,14 +269,12 @@ fn test_handle_append_entries_req_entries_conflict() -> anyhow::Result<()> { // It is no longer a member, change to learner let mut eng = eng(); - let resp = eng.handle_append_entries_req( - &Vote::new_committed(2, 1), - Some(log_id(1, 1)), - vec![blank_ent(1, 2), Entry::new_membership(log_id(3, 3), m34())], - Some(log_id(4, 4)), - ); + let resp = eng.append_entries(&Vote::new_committed(2, 1), Some(log_id(1, 1)), vec![ + blank_ent(1, 2), + Entry::new_membership(log_id(3, 3), m34()), + ]); - assert_eq!(AppendEntriesResponse::Success, resp); + assert_eq!(Ok(()), resp); assert_eq!( &[ log_id(1, 1), // @@ -289,10 +284,9 @@ fn test_handle_append_entries_req_entries_conflict() -> anyhow::Result<()> { ); assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref()); assert_eq!(Some(&log_id(3, 3)), eng.state.last_log_id()); - assert_eq!(Some(&log_id(3, 3)), eng.state.committed()); assert_eq!( MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(3, 3)), m34())), + Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), Arc::new(EffectiveMembership::new(Some(log_id(3, 3)), m34())), ), eng.state.membership_state @@ -313,10 +307,6 @@ fn test_handle_append_entries_req_entries_conflict() -> anyhow::Result<()> { Command::AppendInputEntries { entries: vec![Entry::new_membership(log_id(3, 3), m34())] }, - Command::FollowerCommit { - already_committed: Some(log_id(0, 0)), - upto: log_id(3, 3) - }, ], eng.output.take_commands() ); diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 2f652565a..da787a429 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -467,3 +467,38 @@ impl From> for AppendEntriesResponse { } } } + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub(crate) enum RejectAppendEntries { + #[error("reject AppendEntries by a greater vote: {0}")] + ByVote(Vote), + + #[error("reject AppendEntries because of conflicting log-id: {local:?}; expect to be: {expect:?}")] + ByConflictingLogId { + expect: LogId, + local: Option>, + }, +} + +impl From> for RejectAppendEntries { + fn from(r: RejectVoteRequest) -> Self { + match r { + RejectVoteRequest::ByVote(v) => RejectAppendEntries::ByVote(v), + RejectVoteRequest::ByLastLogId(_) => { + unreachable!("the leader should always has a greater last log id") + } + } + } +} + +impl From>> for AppendEntriesResponse { + fn from(r: Result<(), RejectAppendEntries>) -> Self { + match r { + Ok(_) => AppendEntriesResponse::Success, + Err(e) => match e { + RejectAppendEntries::ByVote(v) => AppendEntriesResponse::HigherVote(v), + RejectAppendEntries::ByConflictingLogId { expect: _, local: _ } => AppendEntriesResponse::Conflict, + }, + } + } +}