Skip to content

Commit

Permalink
feature: RaftStore::get_last_log_id() to get the last known log id in…
Browse files Browse the repository at this point in the history
… log or state machine
  • Loading branch information
drmingdrmer committed Aug 29, 2021
1 parent 9086044 commit ab6689d
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 14 deletions.
20 changes: 14 additions & 6 deletions async-raft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
// TODO(xp): only update commit_index if the log present. e.g., append entries first, then update commit_index.
self.update_next_election_timeout(true);
let mut report_metrics = false;
self.commit_index = msg.leader_commit; // The value for `self.commit_index` is only updated here when not the leader.

// The value for `self.commit_index` is only updated here when not the leader.
self.commit_index = msg.leader_commit;

// Update current term if needed.
if self.current_term != msg.term {
Expand Down Expand Up @@ -101,21 +103,27 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
});
}

// last_log_id.index >= prev_log_id.index

// Previous log info doesn't immediately line up, so perform log consistency check and proceed based on its
// result.
let entries = self
let prev_entry = self
.storage
.get_log_entries(msg.prev_log_id.index..=msg.prev_log_id.index)
.try_get_log_entry(msg.prev_log_id.index)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;
let target_entry = match entries.first() {

let target_entry = match prev_entry {
Some(target_entry) => target_entry,
// The target entry was not found. This can only mean that we don't have the
// specified index yet. Use the last known index & term as a conflict opt.
None => {
// This can only happen if the target entry is removed, e.g., when installing snapshot or log
// compaction.
// Use the last known index & term as a conflict opt.

if report_metrics {
self.report_metrics(Update::Ignore);
}

return Ok(AppendEntriesResponse {
term: self.current_term,
success: false,
Expand Down
24 changes: 20 additions & 4 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,29 @@ where
/// It does not return an error if in defensive mode and the log entry at `log_index` is not found.
async fn try_get_log_entry(&self, log_index: u64) -> Result<Option<Entry<D>>>;

/// Returns the last known log id.
/// It could be the id of the last entry in log, or the last applied id that is saved in state machine.
///
/// When there is no log or state machine, it returns (0,0)
///
/// Caveat: an impl must hold the log-state-machine consistency or must deal with the inconsistency when accessing
/// it:
///
/// I.e.: if `logs.last().log_id.index > last_applied.index`, `logs.last().log_id > last_applied` must hold. E.g.,
/// `logs.last() == {term:1, index:2}` and `last_applied == {term:2, index:1}` is inconsistent:
///
/// Log `{term:1, index:2}` can not be committed and should definitely be removed. The simplest way to achieve
/// consistency is to remove such inconsistent logs after a successful `append_entries` or `install_snapshot`
/// request.
///
/// TODO(xp) test it
/// TODO(xp) defensive test about consistency
async fn get_last_log_id(&self) -> Result<LogId>;

/// Delete all logs in a `range`.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn delete_logs_from<RNG: RangeBounds<u64> + Clone + Debug + Send + Sync + Iterator>(
&self,
range: RNG,
) -> Result<()>;
async fn delete_logs_from<RNG: RangeBounds<u64> + Clone + Debug + Send + Sync>(&self, range: RNG) -> Result<()>;

/// Append a payload of entries to the log.
///
Expand Down
36 changes: 32 additions & 4 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,27 @@ impl MemStore {
std::cmp::max(log_last_id, sm_last_id)
}

pub async fn defensive_consistent_log_sm(&self) -> anyhow::Result<()> {
let log_last_id = {
let log_last = self.log.read().await;
log_last.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default()
};

let sm_last_id = self.sm.read().await.last_applied_log;

if (log_last_id.index == sm_last_id.index && log_last_id != sm_last_id)
|| (log_last_id.index > sm_last_id.index && log_last_id < sm_last_id)
{
return Err(anyhow::anyhow!(
"inconsistent log.last({}) and sm.last_applied({})",
log_last_id,
sm_last_id
));
}

Ok(())
}

pub async fn defensive_apply_index_is_last_applied_plus_one<D: AppData>(
&self,
entries: &[&Entry<D>],
Expand Down Expand Up @@ -586,11 +607,18 @@ impl RaftStorage<ClientRequest, ClientResponse> for MemStore {
Ok(log.get(&log_index).cloned())
}

#[tracing::instrument(level = "trace", skip(self))]
async fn get_last_log_id(&self) -> Result<LogId> {
self.defensive_consistent_log_sm().await?;
// TODO: log id must consistent:
let log_last_id = self.log.read().await.iter().last().map(|(_k, v)| v.log_id).unwrap_or_default();
let last_applied_id = self.sm.read().await.last_applied_log;

Ok(max(log_last_id, last_applied_id))
}

#[tracing::instrument(level = "trace", skip(self, range), fields(range=?range))]
async fn delete_logs_from<R: RangeBounds<u64> + Clone + Debug + Send + Sync + Iterator>(
&self,
range: R,
) -> Result<()> {
async fn delete_logs_from<R: RangeBounds<u64> + Clone + Debug + Send + Sync>(&self, range: R) -> Result<()> {
self.defensive_nonempty_range(range.clone()).await?;
self.defensive_half_open_range(range.clone()).await?;

Expand Down
132 changes: 132 additions & 0 deletions memstore/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ where
run_fut(Suite::get_initial_state_last_log_lt_sm(builder))?;
run_fut(Suite::save_hard_state(builder))?;
run_fut(Suite::get_log_entries(builder))?;
run_fut(Suite::try_get_log_entry(builder))?;
run_fut(Suite::get_last_log_id(builder))?;
run_fut(Suite::delete_logs_from(builder))?;
run_fut(Suite::append_to_log(builder))?;
run_fut(Suite::apply_single(builder))?;
Expand Down Expand Up @@ -500,6 +502,94 @@ where
Ok(())
}

pub async fn try_get_log_entry(builder: &B) -> Result<()> {
let store = builder.new_store(NODE_ID).await;
Self::feed_10_logs_vote_self(&store).await?;

let ent = store.try_get_log_entry(3).await?;
assert_eq!(Some(LogId { term: 1, index: 3 }), ent.map(|x| x.log_id));

let ent = store.try_get_log_entry(0).await?;
assert_eq!(None, ent.map(|x| x.log_id));

let ent = store.try_get_log_entry(11).await?;
assert_eq!(None, ent.map(|x| x.log_id));

Ok(())
}

pub async fn get_last_log_id(builder: &B) -> Result<()> {
let store = builder.new_store(NODE_ID).await;

let log_id = store.get_last_log_id().await?;
assert_eq!(LogId { term: 0, index: 0 }, log_id);

tracing::info!("--- only logs");
{
store
.append_to_log(&[
&Entry {
log_id: LogId { term: 1, index: 1 },
payload: EntryPayload::Blank,
},
&Entry {
log_id: LogId { term: 1, index: 2 },
payload: EntryPayload::Blank,
},
])
.await?;

let log_id = store.get_last_log_id().await?;
assert_eq!(LogId { term: 1, index: 2 }, log_id);
}

tracing::info!("--- last id in logs > last applied id in sm");
{
store
.apply_to_state_machine(&[&Entry {
log_id: LogId { term: 1, index: 1 },
payload: EntryPayload::Blank,
}])
.await?;
let log_id = store.get_last_log_id().await?;
assert_eq!(LogId { term: 1, index: 2 }, log_id);
}

tracing::info!("--- last id in logs == last applied id in sm");
{
store
.apply_to_state_machine(&[&Entry {
log_id: LogId { term: 1, index: 2 },
payload: EntryPayload::Blank,
}])
.await?;
let log_id = store.get_last_log_id().await?;
assert_eq!(LogId { term: 1, index: 2 }, log_id);
}

tracing::info!("--- last id in logs < last applied id in sm");
{
store
.apply_to_state_machine(&[&Entry {
log_id: LogId { term: 1, index: 3 },
payload: EntryPayload::Blank,
}])
.await?;
let log_id = store.get_last_log_id().await?;
assert_eq!(LogId { term: 1, index: 3 }, log_id);
}

tracing::info!("--- no logs, only last applied id in sm");
{
store.delete_logs_from(..).await?;

let log_id = store.get_last_log_id().await?;
assert_eq!(LogId { term: 1, index: 3 }, log_id);
}

Ok(())
}

pub async fn delete_logs_from(builder: &B) -> Result<()> {
tracing::info!("--- delete start == stop");
{
Expand Down Expand Up @@ -718,6 +808,7 @@ where
run_fut(Suite::df_get_initial_state_dirty_log(builder))?;
run_fut(Suite::df_save_hard_state_ascending(builder))?;
run_fut(Suite::df_get_log_entries(builder))?;
run_fut(Suite::df_get_last_log_id(builder))?;
run_fut(Suite::df_delete_logs_from_nonempty_range(builder))?;
run_fut(Suite::df_append_to_log_nonempty_input(builder))?;
run_fut(Suite::df_append_to_log_nonconsecutive_input(builder))?;
Expand Down Expand Up @@ -937,6 +1028,47 @@ where
Ok(())
}

pub async fn df_get_last_log_id(builder: &B) -> Result<()> {
let store = builder.new_store(NODE_ID).await;

tracing::info!("--- last log_id.index == last_applied.index");
{
store
.append_to_log(&[&Entry {
log_id: LogId { term: 1, index: 1 },
payload: EntryPayload::Blank,
}])
.await?;

store
.apply_to_state_machine(&[&Entry {
log_id: LogId { term: 2, index: 1 },
payload: EntryPayload::Blank,
}])
.await?;

let res = store.get_last_log_id().await;
assert!(res.is_err());
}

tracing::info!("--- last log_id.index > last_applied.index => last log_id > last_applied");
{
store.defensive(false).await;
store
.append_to_log(&[&Entry {
log_id: LogId { term: 1, index: 2 },
payload: EntryPayload::Blank,
}])
.await?;
store.defensive(true).await;

let res = store.get_last_log_id().await;
assert!(res.is_err());
}

Ok(())
}

pub async fn df_delete_logs_from_nonempty_range(builder: &B) -> Result<()> {
let store = builder.new_store(NODE_ID).await;
Self::feed_10_logs_vote_self(&store).await?;
Expand Down

0 comments on commit ab6689d

Please sign in to comment.