Skip to content

Commit

Permalink
Change: bsearch to find matching log between leader and follower
Browse files Browse the repository at this point in the history
- Refactor: simplify algo to find matching log between leader and follower.
  It adopts a binary-search like algo:

  The leader tracks the max matched log id(`self.matched`) and the least unmatched log id(`self.max_possible_matched_index`).

  The follower just responds if the `prev_log_id` in

  AppendEntriesRequest matches the log at `prev_log_id.index` in its
  store.

  Remove the case-by-case algo.

- Change: RaftStorage adds 2 new API: `try_get_log_entries()`,
  `first_id_in_log()` and `first_known_log_id()`.

  These a are not stable, may be removed soon.

- Fix: the timeout for `Wait()` should be a total timeout. Otherwise a
  `Wait()` never quits.

- Fix: when send append-entries request, if a log is not found, it
  should retry loading, but not enter snapshot state.
  Because a log may be deleted by RaftCore just after Replication read
  `prev_log_id` from the store.

- Refactor: The two replication loop: line-rate loop and snapshot loop
  should not change the `ReplicationState`, but instead returning an
  error.
  Otherwise it has to check the state everywhere.

- Refactor: simplify receiving RaftCore messages: split
  `drain_raft_rx()` into `process_raft_event()` and
  `try_drain_raft_rx()`.

- Feature: a store impl has to add an initial log at index 0 to make the
  store mathematics complete.

- Feature: add `ReplicationError` to describe all errors that is
  emitted when replicating entries or snapshot.
  • Loading branch information
drmingdrmer committed Dec 20, 2021
1 parent 7f34793 commit df68413
Show file tree
Hide file tree
Showing 26 changed files with 916 additions and 486 deletions.
24 changes: 16 additions & 8 deletions async-raft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::replication::RaftEvent;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftError;
use crate::RaftNetwork;
Expand Down Expand Up @@ -92,7 +93,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}

#[tracing::instrument(level = "trace", skip(self, tx))]
#[tracing::instrument(level = "debug", skip(self, tx))]
pub(super) async fn change_membership(&mut self, members: BTreeSet<NodeId>, wait: bool, tx: ResponseTx) {
// Ensure cluster will have at least one node.
if members.is_empty() {
Expand Down Expand Up @@ -177,7 +178,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
}
}

#[tracing::instrument(level = "trace", skip(self, resp_tx), fields(id=self.core.id))]
#[tracing::instrument(level = "debug", skip(self, resp_tx), fields(id=self.core.id))]
pub async fn append_membership_log(
&mut self,
mem: MembershipConfig,
Expand Down Expand Up @@ -243,11 +244,11 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
.nodes
.iter_mut()
.filter(|(id, _)| !membership.contains(id))
.filter_map(|(idx, replstate)| {
if replstate.matched.index >= index {
.filter_map(|(idx, repl_state)| {
if repl_state.matched.index >= index {
Some(*idx)
} else {
replstate.remove_after_commit = Some(index);
repl_state.remove_after_commit = Some(index);
None
}
})
Expand All @@ -261,10 +262,17 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
for target in nodes_to_remove {
tracing::debug!(target, "removing target node from replication pool");
// TODO(xp): just drop the replication then the task will be terminated.
if let Some(node) = self.nodes.remove(&target) {
let _ = node.replstream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH")));
let removed = self.nodes.remove(&target);
assert!(removed.is_some());

// remove metrics entry
tracing::info!(
"handle_uniform_consensus_committed: removed replication node: {} {:?}",
target,
removed.as_ref().map(|x| (*x).summary())
);

if let Some(node) = removed {
let _ = node.replstream.repl_tx.send((RaftEvent::Terminate, tracing::debug_span!("CH")));
self.leader_metrics.replication.remove(&target);
}
}
Expand Down
Loading

0 comments on commit df68413

Please sign in to comment.