Skip to content

Commit

Permalink
Change: add sub error types of ReplicationError
Browse files Browse the repository at this point in the history
- Add sub errors such as Timeout and NetworkError.

- Remove ReplicationError::IO, use StorageError instead.

- Cleanup finished TODOs.
  • Loading branch information
drmingdrmer committed Jan 20, 2022
1 parent a43d7d1 commit 2254ffc
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 60 deletions.
1 change: 0 additions & 1 deletion openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
Ok(())
}

// TODO(xp): remove this
#[tracing::instrument(level = "debug", skip(self, resp_tx), fields(id=self.core.id))]
pub async fn append_membership_log(
&mut self,
Expand Down
6 changes: 0 additions & 6 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

#[tracing::instrument(level = "debug", skip(self))]
async fn delete_conflict_logs_since(&mut self, start: LogId) -> Result<(), StorageError> {
// TODO(xp): add a StorageAdapter to provide auxiliary APIs.
// e.g.:
// - extract and manage membership config.
// - keep track of last_log_id, first_log_id,
// RaftStorage should only provides the least basic APIs.

self.storage.delete_conflict_logs_since(start).await?;

self.last_log_id = self.storage.get_log_state().await?.last_log_id;
Expand Down
79 changes: 56 additions & 23 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//! Error types exposed by this crate.
use std::collections::BTreeSet;
use std::error::Error;
use std::fmt::Debug;
use std::time::Duration;

use anyerror::AnyError;
use serde::Deserialize;
use serde::Serialize;

Expand Down Expand Up @@ -109,7 +111,6 @@ pub enum ChangeMembershipError {
#[error(transparent)]
LearnerNotFound(#[from] LearnerNotFound),

// TODO(xp): 111 test it
#[error(transparent)]
LearnerIsLagging(#[from] LearnerIsLagging),
}
Expand Down Expand Up @@ -179,42 +180,74 @@ impl From<StorageError> for AddLearnerError {
#[non_exhaustive]
#[allow(clippy::large_enum_variant)]
pub enum ReplicationError {
#[error("seen a higher term: {higher} GT mine: {mine}")]
HigherTerm { higher: u64, mine: u64 },
#[error(transparent)]
HigherTerm(#[from] HigherTerm),

#[error("Replication is closed")]
Closed,

#[error("{0}")]
#[error(transparent)]
LackEntry(#[from] LackEntry),

#[error("leader committed index {committed_index} advances target log index {target_index} too many")]
CommittedAdvanceTooMany { committed_index: u64, target_index: u64 },
#[error(transparent)]
CommittedAdvanceTooMany(#[from] CommittedAdvanceTooMany),

// TODO(xp): two sub type: StorageError / TransportError
// TODO(xp): a sub error for just send_append_entries()
#[error("{0}")]
#[error(transparent)]
StorageError(#[from] StorageError),

#[error(transparent)]
IO {
#[backtrace]
#[from]
source: std::io::Error,
},

#[error("timeout after {timeout:?} to replicate {id}->{target}")]
Timeout {
id: NodeId,
target: NodeId,
timeout: Duration,
},
Timeout(#[from] Timeout),

#[error(transparent)]
Network {
#[backtrace]
source: anyhow::Error,
},
Network(#[from] NetworkError),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("seen a higher term: {higher} GT mine: {mine}")]
pub struct HigherTerm {
pub higher: u64,
pub mine: u64,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("leader committed index {committed_index} advances target log index {target_index} too many")]
pub struct CommittedAdvanceTooMany {
pub committed_index: u64,
pub target_index: u64,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error(transparent)]
pub struct NetworkError {
#[from]
source: AnyError,
}

impl NetworkError {
pub fn new<E: Error + 'static>(e: &E) -> Self {
Self {
source: AnyError::new(e),
}
}
}

impl From<anyhow::Error> for NetworkError {
fn from(e: anyhow::Error) -> Self {
Self {
source: AnyError::from(e),
}
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("timeout after {timeout:?} when {action} {id}->{target}")]
pub struct Timeout {
pub action: String,
pub id: NodeId,
pub target: NodeId,
pub timeout: Duration,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
Expand Down
1 change: 1 addition & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub use crate::storage_error::ErrorSubject;
pub use crate::storage_error::ErrorVerb;
pub use crate::storage_error::StorageError;
pub use crate::storage_error::StorageIOError;
pub use crate::storage_error::ToStorageResult;
pub use crate::storage_error::Violation;
pub use crate::store_ext::StoreExt;
pub use crate::store_wrapper::Wrapper;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::NodeId;

/// A trait defining the interface for a Raft network between cluster members.
///
/// See the [network chapter of the guide](https://datafuselabs.github.io/openraft/network.html)
/// See the [network chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#3-impl-raftnetwork)
/// for details and discussion on this trait and how to implement it.
#[async_trait]
pub trait RaftNetwork<D>: Send + Sync + 'static
Expand Down
56 changes: 27 additions & 29 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,27 @@ use tracing::Span;

use crate::config::Config;
use crate::config::SnapshotPolicy;
use crate::error::CommittedAdvanceTooMany;
use crate::error::HigherTerm;
use crate::error::LackEntry;
use crate::error::NetworkError;
use crate::error::ReplicationError;
use crate::error::Timeout;
use crate::raft::AppendEntriesRequest;
use crate::raft::InstallSnapshotRequest;
use crate::raft_types::LogIdOptionExt;
use crate::raft_types::LogIndexOptionExt;
use crate::storage::Snapshot;
use crate::AppData;
use crate::AppDataResponse;
use crate::ErrorSubject;
use crate::ErrorVerb;
use crate::LogId;
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::ToStorageResult;

#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReplicationMetrics {
Expand Down Expand Up @@ -217,21 +224,16 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
ReplicationError::Closed => {
self.set_target_repl_state(TargetReplState::Shutdown);
}
ReplicationError::HigherTerm { higher, mine: _ } => {
ReplicationError::HigherTerm(h) => {
let _ = self.raft_core_tx.send((
ReplicaEvent::RevertToFollower {
target: self.target,
term: higher,
term: h.higher,
},
tracing::debug_span!("CH"),
));
return;
}
ReplicationError::IO { .. } => {
tracing::error!(error=%err, "error replication to target={}", self.target);
// TODO(xp): tell core to quit?
return;
}
ReplicationError::LackEntry(lack_ent) => {
self.set_target_repl_state(TargetReplState::Snapshotting {
must_include: lack_ent.last_purged_log_id,
Expand Down Expand Up @@ -357,16 +359,17 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
Ok(res) => res,
Err(err) => {
tracing::warn!(error=%err, "error sending AppendEntries RPC to target");
return Err(ReplicationError::Network { source: err });
return Err(ReplicationError::Network(NetworkError::from(err)));
}
},
Err(timeout_err) => {
tracing::warn!(error=%timeout_err, "timeout while sending AppendEntries RPC to target");
return Err(ReplicationError::Timeout {
return Err(ReplicationError::Timeout(Timeout {
action: "send_append_entries".to_string(),
id: self.id,
target: self.target,
timeout: the_timeout,
});
}));
}
};

Expand All @@ -384,10 +387,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
if append_resp.term > self.term {
tracing::debug!({ append_resp.term }, "append entries failed, reverting to follower");

return Err(ReplicationError::HigherTerm {
return Err(ReplicationError::HigherTerm(HigherTerm {
higher: append_resp.term,
mine: self.term,
});
}));
}

tracing::debug!(
Expand Down Expand Up @@ -674,11 +677,11 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
}

if self.needs_snapshot() {
return Err(ReplicationError::CommittedAdvanceTooMany {
return Err(ReplicationError::CommittedAdvanceTooMany(CommittedAdvanceTooMany {
// TODO(xp) fill them
committed_index: 0,
target_index: 0,
});
}));
}

let span = tracing::debug_span!("CHrx:LineRate");
Expand Down Expand Up @@ -764,16 +767,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re
//
},
Err(err) => {
match err {
ReplicationError::StorageError(_) => {
return Err(err);
},
ReplicationError::IO {..} => {
return Err(err);
}
_=> {
// nothing to do
}
if let ReplicationError::StorageError(_) = err {
return Err(err);
}
}
}
Expand Down Expand Up @@ -814,16 +809,19 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn stream_snapshot(&mut self, mut snapshot: Snapshot<S::SnapshotData>) -> Result<(), ReplicationError> {
let end = snapshot.snapshot.seek(SeekFrom::End(0)).await?;
let err_x = || (ErrorSubject::Snapshot(snapshot.meta.clone()), ErrorVerb::Read);

let end = snapshot.snapshot.seek(SeekFrom::End(0)).await.sto_res(err_x)?;

let mut offset = 0;

let mut buf = Vec::with_capacity(self.config.snapshot_max_chunk_size as usize);

loop {
// Build the RPC.
snapshot.snapshot.seek(SeekFrom::Start(offset)).await?;
let n_read = snapshot.snapshot.read_buf(&mut buf).await?;
snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(err_x)?;

let n_read = snapshot.snapshot.read_buf(&mut buf).await.sto_res(err_x)?;

let done = (offset + n_read as u64) == end; // If bytes read == 0, then we're done.
let req = InstallSnapshotRequest {
Expand Down Expand Up @@ -867,10 +865,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Re

// Handle response conditions.
if res.term > self.term {
return Err(ReplicationError::HigherTerm {
return Err(ReplicationError::HigherTerm(HigherTerm {
higher: res.term,
mine: self.term,
});
}));
}

// If we just sent the final chunk of the snapshot, then transition to lagging state.
Expand Down
23 changes: 23 additions & 0 deletions openraft/src/storage_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@ use crate::storage::HardState;
use crate::LogId;
use crate::SnapshotMeta;

/// Convert error to StorageError::IO();
pub trait ToStorageResult<T> {
/// Convert Result<T, E> to Result<T, StorageError::IO(StorageIOError)>
///
/// `f` provides error context for building the StorageIOError.
fn sto_res<F>(self, f: F) -> Result<T, StorageError>
where F: FnOnce() -> (ErrorSubject, ErrorVerb);
}

impl<T> ToStorageResult<T> for Result<T, std::io::Error> {
fn sto_res<F>(self, f: F) -> Result<T, StorageError>
where F: FnOnce() -> (ErrorSubject, ErrorVerb) {
match self {
Ok(x) => Ok(x),
Err(e) => {
let (subject, verb) = f();
let io_err = StorageIOError::new(subject, verb, AnyError::new(&e));
Err(io_err.into())
}
}
}
}

/// An error that occurs when the RaftStore impl runs defensive check of input or output.
/// E.g. re-applying an log entry is a violation that may be a potential bug.
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq, Serialize, Deserialize)]
Expand Down

0 comments on commit 2254ffc

Please sign in to comment.