Skip to content

Commit

Permalink
Feature: save committed log id
Browse files Browse the repository at this point in the history
- **Wrote committed log id to storage**
  Save the committed log id to storage before Raft apply log entries. This can
  recover state machine to the state corresponding to the committed log id upon
  system startup.

- **Re-applied log entries after startup**
  If the last applied log id is smaller than the committed log id saved in
  storage, re-apply log entries from the next index of last applied log id to the
  committed log id.

Version 1 storage API `RaftStorage` and Version 2 storage API
`RaftLogStorage` both add API `save_committed()` and `read_committed()`
to support saving/reading committed log id.

These two new API are optional and provides default dummy
implementation, an application does not need any modifications if it
does not need this feature.
  • Loading branch information
drmingdrmer committed Jul 16, 2023
1 parent 8626ecf commit 942ec78
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 1 deletion.
14 changes: 14 additions & 0 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ pub enum BlockOperation {
pub struct MemStore {
last_purged_log_id: RwLock<Option<LogId<MemNodeId>>>,

committed: RwLock<Option<LogId<MemNodeId>>>,

/// The Raft log. Logs are stored in serialized json.
log: RwLock<BTreeMap<u64, String>>,

Expand Down Expand Up @@ -142,6 +144,7 @@ impl MemStore {

Self {
last_purged_log_id: RwLock::new(None),
committed: RwLock::new(None),
log,
sm,
block: Mutex::new(BTreeMap::new()),
Expand Down Expand Up @@ -322,6 +325,17 @@ impl RaftStorage<TypeConfig> for Arc<MemStore> {
Ok(*self.vote.read().await)
}

async fn save_committed(&mut self, committed: Option<LogId<MemNodeId>>) -> Result<(), StorageError<MemNodeId>> {
tracing::debug!(?committed, "save_committed");
let mut c = self.committed.write().await;
*c = committed;
Ok(())
}

async fn read_committed(&mut self) -> Result<Option<LogId<MemNodeId>>, StorageError<MemNodeId>> {
Ok(*self.committed.read().await)
}

async fn last_applied_state(
&mut self,
) -> Result<(Option<LogId<MemNodeId>>, StoredMembership<MemNodeId, ()>), StorageError<MemNodeId>> {
Expand Down
1 change: 1 addition & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,7 @@ where
ref already_committed,
ref upto,
} => {
self.log_store.save_committed(Some(*upto)).await?;
self.apply_to_state_machine(seq, already_committed.next_index(), upto.index).await?;
}
Command::Replicate { req, target } => {
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ where C: RaftTypeConfig

Command::StateMachine { .. } => CommandKind::StateMachine,
// Apply is firstly handled by RaftCore, then forwarded to state machine worker.
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
Command::Apply { .. } => CommandKind::Main,
}
}
Expand All @@ -168,6 +169,7 @@ where C: RaftTypeConfig
Command::AppendEntry { .. } => None,
Command::AppendInputEntries { .. } => None,
Command::ReplicateCommitted { .. } => None,
// TODO: Apply also write `committed` to log-store, which should be run in CommandKind::Log
Command::Apply { .. } => None,
Command::Replicate { .. } => None,
Command::RebuildReplicationStreams { .. } => None,
Expand Down
8 changes: 8 additions & 0 deletions openraft/src/storage/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ where
S::read_vote(self.storage_mut().await.deref_mut()).await
}

async fn save_committed(&mut self, committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C::NodeId>> {
S::save_committed(self.storage_mut().await.deref_mut(), committed).await
}

async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C::NodeId>> {
S::read_committed(self.storage_mut().await.deref_mut()).await
}

async fn get_log_reader(&mut self) -> Self::LogReader {
S::get_log_reader(self.storage_mut().await.deref_mut()).await
}
Expand Down
48 changes: 47 additions & 1 deletion openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::marker::PhantomData;
use std::sync::Arc;

use crate::display_ext::DisplayOptionExt;
use crate::engine::LogIdList;
use crate::entry::RaftPayload;
use crate::log_id::RaftLogId;
use crate::raft_state::IOState;
use crate::raft_state::LogIOId;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::utime::UTime;
Expand Down Expand Up @@ -64,19 +66,63 @@ where
let vote = self.log_store.read_vote().await?;
let vote = vote.unwrap_or_default();

let mut committed = self.log_store.read_committed().await?;

let st = self.log_store.get_log_state().await?;
let mut last_purged_log_id = st.last_purged_log_id;
let mut last_log_id = st.last_log_id;
let (last_applied, _) = self.state_machine.applied_state().await?;

let (mut last_applied, _) = self.state_machine.applied_state().await?;

tracing::info!(
vote = display(&vote),
last_purged_log_id = display(last_purged_log_id.display()),
last_applied = display(last_applied.display()),
committed = display(committed.display()),
last_log_id = display(last_log_id.display()),
"get_initial_state"
);

// TODO: It is possible `committed < last_applied` because when installing snapshot,
// new committed should be saved, but not yet.
if committed < last_applied {
committed = last_applied;
}

// Re-apply log entries to recover SM to latest state.
if last_applied < committed {
let start = last_applied.next_index();
let end = committed.next_index();

tracing::info!("re-apply log {}..{} to state machine", start, end);

let entries = self.log_store.get_log_entries(start..end).await?;
self.state_machine.apply(entries).await?;

last_applied = committed;
}

let mem_state = self.get_membership().await?;

// Clean up dirty state: snapshot is installed but logs are not cleaned.
if last_log_id < last_applied {
self.log_store.purge(last_applied.unwrap()).await?;
last_log_id = last_applied;
last_purged_log_id = last_applied;

tracing::info!(
"Clean the hole between last_log_id({}) and last_applied({}) by purging logs to {}",
last_log_id.display(),
last_applied.display(),
last_applied.display(),
);
}

tracing::info!(
"load key log ids from ({},{}]",
last_purged_log_id.display(),
last_log_id.display()
);
let log_ids = LogIdList::load_log_ids(last_purged_log_id, last_log_id, self.log_store).await?;

// TODO: `flushed` is not set.
Expand Down
27 changes: 27 additions & 0 deletions openraft/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,33 @@ where C: RaftTypeConfig

async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>>;

/// Saves the last committed log id to storage.
///
/// # Optional feature
///
/// If the state machine flushes state to disk before returning from `apply()`,
/// then the application does not need to implement this method.
///
/// Otherwise, i.e., the state machine just relies on periodical snapshot to persist state to
/// disk:
///
/// - If the `committed` log id is saved, the state machine will be recovered to the state
/// corresponding to this `committed` log id upon system startup, i.e., the state at the point
/// when the committed log id was applied.
///
/// - If the `committed` log id is not saved, Openraft will just recover the state machine to
/// the state of the last snapshot taken.
async fn save_committed(&mut self, _committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C::NodeId>> {
// By default `committed` log id is not saved
Ok(())
}

/// Return the last saved committed log id by [`Self::save_committed`].
async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C::NodeId>> {
// By default `committed` log id is not saved and this method just return None.
Ok(None)
}

// --- Log

/// Returns the last deleted log id and the last log id.
Expand Down
27 changes: 27 additions & 0 deletions openraft/src/storage/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,33 @@ where C: RaftTypeConfig
/// Return the last saved vote by [`Self::save_vote`].
async fn read_vote(&mut self) -> Result<Option<Vote<C::NodeId>>, StorageError<C::NodeId>>;

/// Saves the last committed log id to storage.
///
/// # Optional feature
///
/// If the state machine flushes state to disk before returning from `apply()`,
/// then the application does not need to implement this method.
///
/// Otherwise, i.e., the state machine just relies on periodical snapshot to persist state to
/// disk:
///
/// - If the `committed` log id is saved, the state machine will be recovered to the state
/// corresponding to this `committed` log id upon system startup, i.e., the state at the point
/// when the committed log id was applied.
///
/// - If the `committed` log id is not saved, Openraft will just recover the state machine to
/// the state of the last snapshot taken.
async fn save_committed(&mut self, _committed: Option<LogId<C::NodeId>>) -> Result<(), StorageError<C::NodeId>> {
// By default `committed` log id is not saved
Ok(())
}

/// Return the last saved committed log id by [`Self::save_committed`].
async fn read_committed(&mut self) -> Result<Option<LogId<C::NodeId>>, StorageError<C::NodeId>> {
// By default `committed` log id is not saved and this method just return None.
Ok(None)
}

/// Append log entries and call the `callback` once logs are persisted on disk.
///
/// It should returns immediately after saving the input log entries in memory, and calls the
Expand Down
38 changes: 38 additions & 0 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ where
run_fut(run_test(builder, Self::get_initial_state_last_log_gt_sm))?;
run_fut(run_test(builder, Self::get_initial_state_last_log_lt_sm))?;
run_fut(run_test(builder, Self::get_initial_state_log_ids))?;
run_fut(run_test(builder, Self::get_initial_state_re_apply_committed))?;
run_fut(run_test(builder, Self::save_vote))?;
run_fut(run_test(builder, Self::get_log_entries))?;
run_fut(run_test(builder, Self::try_get_log_entry))?;
Expand Down Expand Up @@ -593,6 +594,43 @@ where
Ok(())
}

/// Test if committed logs are re-applied.
pub async fn get_initial_state_re_apply_committed(
mut store: LS,
mut sm: SM,
) -> Result<(), StorageError<C::NodeId>> {
Self::default_vote(&mut store).await?;

append(&mut store, [
blank_ent_0::<C>(1, 2),
blank_ent_0::<C>(1, 3),
blank_ent_0::<C>(1, 4),
blank_ent_0::<C>(1, 5),
])
.await?;
store.purge(log_id_0(1, 1)).await?;

apply(&mut sm, [blank_ent_0::<C>(1, 2)]).await?;

store.save_committed(Some(log_id_0(1, 4))).await?;
let got = store.read_committed().await?;
if got.is_none() {
tracing::info!("This implementation does not store committed log id, skip test re-applying committed logs");
return Ok(());
}

let initial = StorageHelper::new(&mut store, &mut sm).get_initial_state().await?;

assert_eq!(Some(&log_id_0(1, 4)), initial.io_applied(), "last_applied is updated");
assert_eq!(
Some(log_id_0(1, 4)),
sm.applied_state().await?.0,
"last_applied is updated"
);

Ok(())
}

pub async fn save_vote(mut store: LS, mut sm: SM) -> Result<(), StorageError<C::NodeId>> {
store.save_vote(&Vote::new(100, NODE_ID.into())).await?;

Expand Down
11 changes: 11 additions & 0 deletions tests/tests/log_store/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#![cfg_attr(feature = "bt", feature(error_generic_member_access))]
#![cfg_attr(feature = "bt", feature(provide_any))]

#[macro_use]
#[path = "../fixtures/mod.rs"]
mod fixtures;

// The number indicate the preferred running order for these case.
// The later tests may depend on the earlier ones.

mod t10_save_committed;
46 changes: 46 additions & 0 deletions tests/tests/log_store/t10_save_committed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::storage::RaftLogStorage;
use openraft::testing::log_id;
use openraft::Config;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Before applying log, write `committed` log id to log store.
#[async_entry::test(worker_threads = 4, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn write_committed_log_id_to_log_store() -> Result<()> {
let config = Arc::new(
Config {
enable_tick: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let mut log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?;

log_index += router.client_request_many(0, "0", 10).await?;

for i in [0, 1, 2] {
router.wait(&i, timeout()).log(Some(log_index), "write logs").await?;
}

for id in [0, 1, 2] {
let (_, mut ls, _) = router.remove_node(id).unwrap();
let committed = ls.read_committed().await?;
assert_eq!(Some(log_id(1, 0, log_index)), committed, "node-{} committed", id);
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1000))
}

0 comments on commit 942ec78

Please sign in to comment.