From ab6689d951954e3adbe8eb427364cf9062da1425 Mon Sep 17 00:00:00 2001 From: drdr xp Date: Sun, 29 Aug 2021 17:31:15 +0800 Subject: [PATCH] feature: RaftStore::get_last_log_id() to get the last known log id in log or state machine --- async-raft/src/core/append_entries.rs | 20 ++-- async-raft/src/storage.rs | 24 ++++- memstore/src/lib.rs | 36 ++++++- memstore/src/test.rs | 132 ++++++++++++++++++++++++++ 4 files changed, 198 insertions(+), 14 deletions(-) diff --git a/async-raft/src/core/append_entries.rs b/async-raft/src/core/append_entries.rs index 749c60ff2..663bc48de 100644 --- a/async-raft/src/core/append_entries.rs +++ b/async-raft/src/core/append_entries.rs @@ -41,7 +41,9 @@ impl, S: RaftStorage> Ra // TODO(xp): only update commit_index if the log present. e.g., append entries first, then update commit_index. self.update_next_election_timeout(true); let mut report_metrics = false; - self.commit_index = msg.leader_commit; // The value for `self.commit_index` is only updated here when not the leader. + + // The value for `self.commit_index` is only updated here when not the leader. + self.commit_index = msg.leader_commit; // Update current term if needed. if self.current_term != msg.term { @@ -101,21 +103,27 @@ impl, S: RaftStorage> Ra }); } + // last_log_id.index >= prev_log_id.index + // Previous log info doesn't immediately line up, so perform log consistency check and proceed based on its // result. - let entries = self + let prev_entry = self .storage - .get_log_entries(msg.prev_log_id.index..=msg.prev_log_id.index) + .try_get_log_entry(msg.prev_log_id.index) .await .map_err(|err| self.map_fatal_storage_error(err))?; - let target_entry = match entries.first() { + + let target_entry = match prev_entry { Some(target_entry) => target_entry, - // The target entry was not found. This can only mean that we don't have the - // specified index yet. Use the last known index & term as a conflict opt. None => { + // This can only happen if the target entry is removed, e.g., when installing snapshot or log + // compaction. + // Use the last known index & term as a conflict opt. + if report_metrics { self.report_metrics(Update::Ignore); } + return Ok(AppendEntriesResponse { term: self.current_term, success: false, diff --git a/async-raft/src/storage.rs b/async-raft/src/storage.rs index 47dbea10f..53312851a 100644 --- a/async-raft/src/storage.rs +++ b/async-raft/src/storage.rs @@ -168,13 +168,29 @@ where /// It does not return an error if in defensive mode and the log entry at `log_index` is not found. async fn try_get_log_entry(&self, log_index: u64) -> Result>>; + /// Returns the last known log id. + /// It could be the id of the last entry in log, or the last applied id that is saved in state machine. + /// + /// When there is no log or state machine, it returns (0,0) + /// + /// Caveat: an impl must hold the log-state-machine consistency or must deal with the inconsistency when accessing + /// it: + /// + /// I.e.: if `logs.last().log_id.index > last_applied.index`, `logs.last().log_id > last_applied` must hold. E.g., + /// `logs.last() == {term:1, index:2}` and `last_applied == {term:2, index:1}` is inconsistent: + /// + /// Log `{term:1, index:2}` can not be committed and should definitely be removed. The simplest way to achieve + /// consistency is to remove such inconsistent logs after a successful `append_entries` or `install_snapshot` + /// request. + /// + /// TODO(xp) test it + /// TODO(xp) defensive test about consistency + async fn get_last_log_id(&self) -> Result; + /// Delete all logs in a `range`. /// /// Errors returned from this method will cause Raft to go into shutdown. - async fn delete_logs_from + Clone + Debug + Send + Sync + Iterator>( - &self, - range: RNG, - ) -> Result<()>; + async fn delete_logs_from + Clone + Debug + Send + Sync>(&self, range: RNG) -> Result<()>; /// Append a payload of entries to the log. /// diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 9af6d2f38..0244313b2 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -286,6 +286,27 @@ impl MemStore { std::cmp::max(log_last_id, sm_last_id) } + pub async fn defensive_consistent_log_sm(&self) -> anyhow::Result<()> { + let log_last_id = { + let log_last = self.log.read().await; + log_last.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default() + }; + + let sm_last_id = self.sm.read().await.last_applied_log; + + if (log_last_id.index == sm_last_id.index && log_last_id != sm_last_id) + || (log_last_id.index > sm_last_id.index && log_last_id < sm_last_id) + { + return Err(anyhow::anyhow!( + "inconsistent log.last({}) and sm.last_applied({})", + log_last_id, + sm_last_id + )); + } + + Ok(()) + } + pub async fn defensive_apply_index_is_last_applied_plus_one( &self, entries: &[&Entry], @@ -586,11 +607,18 @@ impl RaftStorage for MemStore { Ok(log.get(&log_index).cloned()) } + #[tracing::instrument(level = "trace", skip(self))] + async fn get_last_log_id(&self) -> Result { + self.defensive_consistent_log_sm().await?; + // TODO: log id must consistent: + let log_last_id = self.log.read().await.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default(); + let last_applied_id = self.sm.read().await.last_applied_log; + + Ok(max(log_last_id, last_applied_id)) + } + #[tracing::instrument(level = "trace", skip(self, range), fields(range=?range))] - async fn delete_logs_from + Clone + Debug + Send + Sync + Iterator>( - &self, - range: R, - ) -> Result<()> { + async fn delete_logs_from + Clone + Debug + Send + Sync>(&self, range: R) -> Result<()> { self.defensive_nonempty_range(range.clone()).await?; self.defensive_half_open_range(range.clone()).await?; diff --git a/memstore/src/test.rs b/memstore/src/test.rs index 1c5269050..a42d61026 100644 --- a/memstore/src/test.rs +++ b/memstore/src/test.rs @@ -113,6 +113,8 @@ where run_fut(Suite::get_initial_state_last_log_lt_sm(builder))?; run_fut(Suite::save_hard_state(builder))?; run_fut(Suite::get_log_entries(builder))?; + run_fut(Suite::try_get_log_entry(builder))?; + run_fut(Suite::get_last_log_id(builder))?; run_fut(Suite::delete_logs_from(builder))?; run_fut(Suite::append_to_log(builder))?; run_fut(Suite::apply_single(builder))?; @@ -500,6 +502,94 @@ where Ok(()) } + pub async fn try_get_log_entry(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + Self::feed_10_logs_vote_self(&store).await?; + + let ent = store.try_get_log_entry(3).await?; + assert_eq!(Some(LogId { term: 1, index: 3 }), ent.map(|x| x.log_id)); + + let ent = store.try_get_log_entry(0).await?; + assert_eq!(None, ent.map(|x| x.log_id)); + + let ent = store.try_get_log_entry(11).await?; + assert_eq!(None, ent.map(|x| x.log_id)); + + Ok(()) + } + + pub async fn get_last_log_id(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + let log_id = store.get_last_log_id().await?; + assert_eq!(LogId { term: 0, index: 0 }, log_id); + + tracing::info!("--- only logs"); + { + store + .append_to_log(&[ + &Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }, + &Entry { + log_id: LogId { term: 1, index: 2 }, + payload: EntryPayload::Blank, + }, + ]) + .await?; + + let log_id = store.get_last_log_id().await?; + assert_eq!(LogId { term: 1, index: 2 }, log_id); + } + + tracing::info!("--- last id in logs > last applied id in sm"); + { + store + .apply_to_state_machine(&[&Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }]) + .await?; + let log_id = store.get_last_log_id().await?; + assert_eq!(LogId { term: 1, index: 2 }, log_id); + } + + tracing::info!("--- last id in logs == last applied id in sm"); + { + store + .apply_to_state_machine(&[&Entry { + log_id: LogId { term: 1, index: 2 }, + payload: EntryPayload::Blank, + }]) + .await?; + let log_id = store.get_last_log_id().await?; + assert_eq!(LogId { term: 1, index: 2 }, log_id); + } + + tracing::info!("--- last id in logs < last applied id in sm"); + { + store + .apply_to_state_machine(&[&Entry { + log_id: LogId { term: 1, index: 3 }, + payload: EntryPayload::Blank, + }]) + .await?; + let log_id = store.get_last_log_id().await?; + assert_eq!(LogId { term: 1, index: 3 }, log_id); + } + + tracing::info!("--- no logs, only last applied id in sm"); + { + store.delete_logs_from(..).await?; + + let log_id = store.get_last_log_id().await?; + assert_eq!(LogId { term: 1, index: 3 }, log_id); + } + + Ok(()) + } + pub async fn delete_logs_from(builder: &B) -> Result<()> { tracing::info!("--- delete start == stop"); { @@ -718,6 +808,7 @@ where run_fut(Suite::df_get_initial_state_dirty_log(builder))?; run_fut(Suite::df_save_hard_state_ascending(builder))?; run_fut(Suite::df_get_log_entries(builder))?; + run_fut(Suite::df_get_last_log_id(builder))?; run_fut(Suite::df_delete_logs_from_nonempty_range(builder))?; run_fut(Suite::df_append_to_log_nonempty_input(builder))?; run_fut(Suite::df_append_to_log_nonconsecutive_input(builder))?; @@ -937,6 +1028,47 @@ where Ok(()) } + pub async fn df_get_last_log_id(builder: &B) -> Result<()> { + let store = builder.new_store(NODE_ID).await; + + tracing::info!("--- last log_id.index == last_applied.index"); + { + store + .append_to_log(&[&Entry { + log_id: LogId { term: 1, index: 1 }, + payload: EntryPayload::Blank, + }]) + .await?; + + store + .apply_to_state_machine(&[&Entry { + log_id: LogId { term: 2, index: 1 }, + payload: EntryPayload::Blank, + }]) + .await?; + + let res = store.get_last_log_id().await; + assert!(res.is_err()); + } + + tracing::info!("--- last log_id.index > last_applied.index => last log_id > last_applied"); + { + store.defensive(false).await; + store + .append_to_log(&[&Entry { + log_id: LogId { term: 1, index: 2 }, + payload: EntryPayload::Blank, + }]) + .await?; + store.defensive(true).await; + + let res = store.get_last_log_id().await; + assert!(res.is_err()); + } + + Ok(()) + } + pub async fn df_delete_logs_from_nonempty_range(builder: &B) -> Result<()> { let store = builder.new_store(NODE_ID).await; Self::feed_10_logs_vote_self(&store).await?;