Skip to content

Commit

Permalink
Improve: move receiving snapshot chunk to sm::Worker
Browse files Browse the repository at this point in the history
Receiving snapshot chunk should not be run in RaftCore task.
Otherwise it will block RaftCore.

In this commit this task is moved to sm::Worker, running in another
task. The corresponding responding command will not be run until
sm::Worker notify RaftCore receiving is finished.
  • Loading branch information
drmingdrmer committed Apr 27, 2023
1 parent d9dd7d0 commit 5415420
Show file tree
Hide file tree
Showing 25 changed files with 337 additions and 206 deletions.
6 changes: 6 additions & 0 deletions openraft/src/core/command_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#[derive(Debug, Clone)]
#[derive(Default)]
pub(crate) struct CommandState {
/// The sequence number of the last finished sm command.
pub(crate) finished_sm_seq: u64,
}
1 change: 1 addition & 0 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
//! storage or forward messages to other raft nodes.
pub(crate) mod balancer;
pub(crate) mod command_state;
pub(crate) mod notify;
mod raft_core;
mod replication_state;
Expand Down
8 changes: 8 additions & 0 deletions openraft/src/core/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ where C: RaftTypeConfig
},
}

impl<C> Notify<C>
where C: RaftTypeConfig
{
pub(crate) fn sm(command_result: sm::CommandResult<C>) -> Self {
Self::StateMachine { command_result }
}
}

impl<C> MessageSummary<Notify<C>> for Notify<C>
where C: RaftTypeConfig
{
Expand Down
122 changes: 38 additions & 84 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ use tracing::Span;
use crate::config::Config;
use crate::config::RuntimeConfig;
use crate::core::balancer::Balancer;
use crate::core::command_state::CommandState;
use crate::core::notify::Notify;
use crate::core::sm;
use crate::core::sm::CommandSeq;
use crate::core::ServerState;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplaySlice;
Expand All @@ -42,7 +44,6 @@ use crate::error::ClientWriteError;
use crate::error::Fatal;
use crate::error::ForwardToLeader;
use crate::error::InitializeError;
use crate::error::InstallSnapshotError;
use crate::error::QuorumNotEnough;
use crate::error::RPCError;
use crate::error::Timeout;
Expand All @@ -61,7 +62,6 @@ use crate::raft::ClientWriteResponse;
use crate::raft::ClientWriteTx;
use crate::raft::ExternalCommand;
use crate::raft::InstallSnapshotRequest;
use crate::raft::InstallSnapshotResponse;
use crate::raft::InstallSnapshotTx;
use crate::raft::RaftMsg;
use crate::raft::ResultSender;
Expand Down Expand Up @@ -195,6 +195,8 @@ where

pub(crate) tx_metrics: watch::Sender<RaftMetrics<C::NodeId, C::Node>>,

pub(crate) command_state: CommandState,

pub(crate) span: Span,

pub(crate) _p: PhantomData<SM>,
Expand Down Expand Up @@ -555,82 +557,16 @@ where
/// Invoked by leader to send chunks of a snapshot to a follower.
///
/// Leaders always send chunks in order. It is important to note that, according to the Raft
/// spec, a log may only have one snapshot at any time. As snapshot contents are application
/// specific, the Raft log will only store a pointer to the snapshot file along with the
/// index & term.
/// spec, a node may only have one snapshot at any time. As snapshot contents are application
/// specific.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn handle_install_snapshot_request(
pub(crate) fn handle_install_snapshot_request(
&mut self,
req: InstallSnapshotRequest<C>,
tx: InstallSnapshotTx<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>> {
// TODO: move receiving to another thread.
tracing::info!(req = display(req.summary()));

let snapshot_meta = req.meta.clone();
let done = req.done;

let res = self.engine.vote_handler().accept_vote(&req.vote, tx, |state, _rejected| {
Ok(InstallSnapshotResponse {
vote: *state.vote_ref(),
})
});

let tx = match res {
Ok(tx) => tx,
Err(_) => return Ok(()),
};

// TODO(2): This is still blocking, to make it non-blocking, we need to move receiving to another
// thread.
let (recv_tx, recv_rx) = oneshot::channel::<Result<(), InstallSnapshotError>>();

let cmd = sm::Command::receive(req, recv_tx);

self.sm_handle.send(cmd).map_err(|_e| {
StorageIOError::write_snapshot(
Some(snapshot_meta.signature()),
AnyError::error("sm-worker channel closed"),
)
})?;

let recv_res = recv_rx.await.map_err(|_e| {
StorageIOError::write_snapshot(
Some(snapshot_meta.signature()),
AnyError::error("sm-worker channel closed"),
)
})?;

if let Err(e) = recv_res {
self.engine.output.push_command(Command::Respond {
when: None,
resp: Respond::new(Err(e), tx),
});
return Ok(());
}

let mut condition = None;

if done {
// If to install snapshot, we can only respond when snapshot is successfully installed.
condition = Some(Condition::Applied {
log_id: snapshot_meta.last_log_id,
});

self.engine.following_handler().install_snapshot(snapshot_meta);
}

self.engine.output.push_command(Command::Respond {
when: condition,
resp: Respond::new(
Ok(InstallSnapshotResponse {
vote: *self.engine.state.vote_ref(),
}),
tx,
),
});

Ok(())
) {
tracing::info!(req = display(req.summary()), "{}", func_name!());
self.engine.handle_install_snapshot(req, tx);
}

/// Trigger a snapshot building(log compaction) job if there is no pending building job.
Expand Down Expand Up @@ -717,6 +653,7 @@ where
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn apply_to_state_machine(
&mut self,
seq: CommandSeq,
since: u64,
upto_index: u64,
) -> Result<(), StorageError<C::NodeId>> {
Expand All @@ -743,7 +680,7 @@ where

let last_applied = *entries[entries.len() - 1].get_log_id();

let cmd = sm::Command::apply(entries);
let cmd = sm::Command::apply(entries).with_seq(seq);
self.sm_handle.send(cmd).map_err(|e| StorageIOError::apply(last_applied, AnyError::error(e)))?;

Ok(())
Expand Down Expand Up @@ -1141,11 +1078,11 @@ where
RaftMsg::InstallSnapshot { rpc, tx } => {
tracing::info!(
req = display(rpc.summary()),
"received RaftMst::IntallSnapshot: {}",
"received RaftMst::InstallSnapshot: {}",
func_name!()
);

self.handle_install_snapshot_request(rpc, tx).await?;
self.handle_install_snapshot_request(rpc, tx);
}
RaftMsg::CheckIsLeaderRequest { tx } => {
if self.engine.state.is_leader(&self.engine.config.id) {
Expand Down Expand Up @@ -1335,6 +1272,14 @@ where
Notify::StateMachine { command_result } => {
tracing::debug!("sm::StateMachine command result: {:?}", command_result);

debug_assert!(
self.command_state.finished_sm_seq < command_result.command_seq,
"sm::StateMachine command result is out of order: expect {} < {}",
self.command_state.finished_sm_seq,
command_result.command_seq
);
self.command_state.finished_sm_seq = command_result.command_seq;

match command_result.result? {
sm::Response::BuildSnapshot(meta) => {
tracing::info!(
Expand All @@ -1344,9 +1289,6 @@ where
);
self.engine.finish_building_snapshot(meta);
}
sm::Response::GetSnapshot(_) => {
tracing::info!("sm::StateMachine command done: GetSnapshot: {}", func_name!());
}
sm::Response::ReceiveSnapshotChunk(_) => {
tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!());
}
Expand Down Expand Up @@ -1515,7 +1457,10 @@ where
SM: RaftStateMachine<C>,
{
async fn run_command<'e>(&mut self, cmd: Command<C>) -> Result<Option<Command<C>>, StorageError<C::NodeId>> {
if let Some(condition) = cmd.condition() {
let condition = cmd.condition();
tracing::debug!("condition: {:?}", condition);

if let Some(condition) = condition {
match condition {
Condition::LogFlushed { .. } => {
todo!()
Expand All @@ -1530,8 +1475,16 @@ where
return Ok(Some(cmd));
}
}
Condition::StateMachineCommand { .. } => {
todo!()
Condition::StateMachineCommand { command_seq } => {
if self.command_state.finished_sm_seq < *command_seq {
tracing::debug!(
"sm::Command({}) has not yet finished({}), postpone cmd: {:?}",
command_seq,
self.command_state.finished_sm_seq,
cmd
);
return Ok(Some(cmd));
}
}
}
}
Expand Down Expand Up @@ -1584,10 +1537,11 @@ where
}
}
Command::Apply {
seq,
ref already_committed,
ref upto,
} => {
self.apply_to_state_machine(already_committed.next_index(), upto.index).await?;
self.apply_to_state_machine(seq, already_committed.next_index(), upto.index).await?;
}
Command::Replicate { req, target } => {
if let Some(l) = &self.leader_data {
Expand Down
41 changes: 16 additions & 25 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use tokio::sync::oneshot;

use crate::display_ext::DisplaySlice;
use crate::error::InstallSnapshotError;
use crate::log_id::RaftLogId;
use crate::raft::InstallSnapshotRequest;
use crate::MessageSummary;
use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::SnapshotMeta;
Expand All @@ -35,26 +33,24 @@ where C: RaftTypeConfig
impl<C> Command<C>
where C: RaftTypeConfig
{
/// Generate the next command seq with atomic increment.
#[allow(dead_code)]
fn next_seq() -> CommandSeq {
static SEQ: AtomicU64 = AtomicU64::new(1);
SEQ.fetch_add(1, Ordering::Relaxed)
}

pub(crate) fn new(payload: CommandPayload<C>) -> Self {
Self {
// seq: Self::next_seq(),
seq: 0,
payload,
}
Self { seq: 0, payload }
}

#[allow(dead_code)]
pub(crate) fn seq(&self) -> CommandSeq {
self.seq
}

pub(crate) fn with_seq(mut self, seq: CommandSeq) -> Self {
self.seq = seq;
self
}

pub(crate) fn set_seq(&mut self, seq: CommandSeq) {
self.seq = seq;
}

pub(crate) fn build_snapshot() -> Self {
let payload = CommandPayload::BuildSnapshot;
Command::new(payload)
Expand All @@ -65,14 +61,12 @@ where C: RaftTypeConfig
Command::new(payload)
}

pub(crate) fn receive(
req: InstallSnapshotRequest<C>,
tx: oneshot::Sender<Result<(), InstallSnapshotError>>,
) -> Self {
let payload = CommandPayload::ReceiveSnapshotChunk { req, tx };
pub(crate) fn receive(req: InstallSnapshotRequest<C>) -> Self {
let payload = CommandPayload::ReceiveSnapshotChunk { req };
Command::new(payload)
}

// TODO: all sm command should have a command seq.
pub(crate) fn install_snapshot(snapshot_meta: SnapshotMeta<C::NodeId, C::Node>) -> Self {
let payload = CommandPayload::FinalizeSnapshot {
install: true,
Expand Down Expand Up @@ -117,10 +111,7 @@ where C: RaftTypeConfig
/// If it is the final chunk, the snapshot stream will be closed and saved.
///
/// Installing a snapshot includes two steps: ReceiveSnapshotChunk and FinalizeSnapshot.
ReceiveSnapshotChunk {
req: InstallSnapshotRequest<C>,
tx: oneshot::Sender<Result<(), InstallSnapshotError>>,
},
ReceiveSnapshotChunk { req: InstallSnapshotRequest<C> },

/// After receiving all chunks, finalize the snapshot by installing it or discarding it,
/// if the snapshot is stale(the snapshot last log id is smaller than the local committed).
Expand All @@ -142,7 +133,7 @@ where C: RaftTypeConfig
CommandPayload::BuildSnapshot => write!(f, "BuildSnapshot"),
CommandPayload::GetSnapshot { .. } => write!(f, "GetSnapshot"),
CommandPayload::ReceiveSnapshotChunk { req, .. } => {
write!(f, "ReceiveSnapshotChunk: {:?}", req)
write!(f, "ReceiveSnapshotChunk: {}", req.summary())
}
CommandPayload::FinalizeSnapshot { install, snapshot_meta } => {
write!(f, "FinalizeSnapshot: install:{} {:?}", install, snapshot_meta)
Expand Down
Loading

0 comments on commit 5415420

Please sign in to comment.