From 6f20e1fc80fa59cc758eff731663f3becc89671c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 7 Apr 2022 21:55:36 +0800 Subject: [PATCH] Feature: add trait `RaftPayload` `RaftEntry` to access payload and entry without the need to know about user data, i.e., `AppData` or `AppDataResponse`. - With `RaftPayload` and `RaftEntry`, the protocol can be implememnted in a standalone mod, without depending on type of user data. I.e. the protocol mod does not need to know about `AppData`, or `RaftStorage`. This will make test easier to write. - Refactor: move `Entry`, `EntryPayload`, `RaftPayload`, `RaftEntry` into file `entry.rs`. --- example-raft-kv/src/network/api.rs | 2 +- example-raft-kv/src/store/mod.rs | 4 +- memstore/src/lib.rs | 4 +- openraft/src/core/admin.rs | 2 +- openraft/src/core/append_entries.rs | 4 +- openraft/src/core/client.rs | 4 +- openraft/src/core/mod.rs | 4 +- openraft/src/defensive.rs | 2 +- openraft/src/entry.rs | 152 ++++++++++++++++++ openraft/src/lib.rs | 4 + openraft/src/raft.rs | 97 +---------- openraft/src/storage.rs | 4 +- openraft/src/store_ext.rs | 2 +- openraft/src/testing/suite.rs | 4 +- .../t10_conflict_with_empty_entries.rs | 4 +- .../append_entries/t20_append_conflicts.rs | 2 +- .../t30_append_inconsistent_log.rs | 4 +- .../t40_append_updates_membership.rs | 4 +- openraft/tests/elect_compare_last_log.rs | 4 +- openraft/tests/fixtures/mod.rs | 4 +- openraft/tests/initialization.rs | 2 +- openraft/tests/log_compaction/compaction.rs | 4 +- ...9_new_leader_auto_commit_uniform_config.rs | 4 +- .../snapshot/snapshot_overrides_membership.rs | 4 +- 24 files changed, 194 insertions(+), 131 deletions(-) create mode 100644 openraft/src/entry.rs diff --git a/example-raft-kv/src/network/api.rs b/example-raft-kv/src/network/api.rs index 5078eb008..1f4b9d437 100644 --- a/example-raft-kv/src/network/api.rs +++ b/example-raft-kv/src/network/api.rs @@ -5,7 +5,7 @@ use actix_web::Responder; use openraft::error::CheckIsLeaderError; use openraft::error::Infallible; use openraft::raft::ClientWriteRequest; -use openraft::raft::EntryPayload; +use openraft::EntryPayload; use web::Json; use crate::app::ExampleApp; diff --git a/example-raft-kv/src/store/mod.rs b/example-raft-kv/src/store/mod.rs index 5088f2629..87c300f6e 100644 --- a/example-raft-kv/src/store/mod.rs +++ b/example-raft-kv/src/store/mod.rs @@ -6,12 +6,12 @@ use std::sync::Arc; use std::sync::Mutex; use openraft::async_trait::async_trait; -use openraft::raft::Entry; -use openraft::raft::EntryPayload; use openraft::storage::LogState; use openraft::storage::Snapshot; use openraft::AnyError; use openraft::EffectiveMembership; +use openraft::Entry; +use openraft::EntryPayload; use openraft::ErrorSubject; use openraft::ErrorVerb; use openraft::LogId; diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index 39bd13b3e..5152f8cca 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -10,14 +10,14 @@ use std::sync::Arc; use std::sync::Mutex; use openraft::async_trait::async_trait; -use openraft::raft::Entry; -use openraft::raft::EntryPayload; use openraft::storage::LogState; use openraft::storage::RaftLogReader; use openraft::storage::RaftSnapshotBuilder; use openraft::storage::Snapshot; use openraft::AnyError; use openraft::EffectiveMembership; +use openraft::Entry; +use openraft::EntryPayload; use openraft::ErrorSubject; use openraft::ErrorVerb; use openraft::LogId; diff --git a/openraft/src/core/admin.rs b/openraft/src/core/admin.rs index e9715e15e..ce14c8fb6 100644 --- a/openraft/src/core/admin.rs +++ b/openraft/src/core/admin.rs @@ -23,10 +23,10 @@ use crate::metrics::RemoveTarget; use crate::raft::AddLearnerResponse; use crate::raft::ChangeMembers; use crate::raft::ClientWriteResponse; -use crate::raft::EntryPayload; use crate::raft::RaftRespTx; use crate::raft_types::LogIdOptionExt; use crate::versioned::Updatable; +use crate::EntryPayload; use crate::LogId; use crate::Membership; use crate::Node; diff --git a/openraft/src/core/append_entries.rs b/openraft/src/core/append_entries.rs index a0bb92b81..8a1c63326 100644 --- a/openraft/src/core/append_entries.rs +++ b/openraft/src/core/append_entries.rs @@ -4,10 +4,10 @@ use crate::core::State; use crate::error::AppendEntriesError; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; -use crate::raft::Entry; -use crate::raft::EntryPayload; use crate::raft_types::LogIdOptionExt; use crate::EffectiveMembership; +use crate::Entry; +use crate::EntryPayload; use crate::LogId; use crate::MessageSummary; use crate::RaftNetworkFactory; diff --git a/openraft/src/core/client.rs b/openraft/src/core/client.rs index bac6c662e..c42aa2caa 100644 --- a/openraft/src/core/client.rs +++ b/openraft/src/core/client.rs @@ -20,10 +20,10 @@ use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::raft::ClientWriteRequest; use crate::raft::ClientWriteResponse; -use crate::raft::Entry; -use crate::raft::EntryPayload; use crate::raft::RaftRespTx; use crate::replication::RaftEvent; +use crate::Entry; +use crate::EntryPayload; use crate::MessageSummary; use crate::RPCTypes; use crate::RaftNetwork; diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index cc1be5f50..fce9c4ea3 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -45,8 +45,6 @@ use crate::error::NotAllowed; use crate::metrics::RaftMetrics; use crate::metrics::ReplicationMetrics; use crate::raft::AddLearnerResponse; -use crate::raft::Entry; -use crate::raft::EntryPayload; use crate::raft::RaftMsg; use crate::raft::RaftRespTx; use crate::raft::VoteResponse; @@ -56,6 +54,8 @@ use crate::replication::ReplicationStream; use crate::storage::RaftSnapshotBuilder; use crate::versioned::Versioned; use crate::vote::Vote; +use crate::Entry; +use crate::EntryPayload; use crate::LogId; use crate::Membership; use crate::MessageSummary; diff --git a/openraft/src/defensive.rs b/openraft/src/defensive.rs index 103225ce5..9b772c134 100644 --- a/openraft/src/defensive.rs +++ b/openraft/src/defensive.rs @@ -4,9 +4,9 @@ use std::ops::RangeBounds; use async_trait::async_trait; -use crate::raft::Entry; use crate::raft_types::LogIdOptionExt; use crate::DefensiveError; +use crate::Entry; use crate::ErrorSubject; use crate::LogId; use crate::RaftStorage; diff --git a/openraft/src/entry.rs b/openraft/src/entry.rs new file mode 100644 index 000000000..578776dbf --- /dev/null +++ b/openraft/src/entry.rs @@ -0,0 +1,152 @@ +use std::fmt::Debug; + +use crate::LogId; +use crate::Membership; +use crate::MessageSummary; +use crate::NodeId; +use crate::RaftTypeConfig; + +/// Defines operations on an entry payload. +pub trait RaftPayload { + /// Return `Some(())` if the entry payload is blank. + fn is_blank(&self) -> bool; + + /// Return `Some(&Membership)` if the entry payload is a membership payload. + fn get_membership(&self) -> Option<&Membership>; +} + +/// Defines operations on an entry. +pub trait RaftEntry: RaftPayload { + fn get_log_id(&self) -> &LogId; + + fn set_log_id(&mut self, log_id: &LogId); +} + +/// Log entry payload variants. +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +pub enum EntryPayload { + /// An empty payload committed by a new cluster leader. + Blank, + + Normal(C::D), + + /// A change-membership log entry. + Membership(Membership), +} + +impl MessageSummary for EntryPayload { + fn summary(&self) -> String { + match self { + EntryPayload::Blank => "blank".to_string(), + EntryPayload::Normal(_n) => "normal".to_string(), + EntryPayload::Membership(c) => { + format!("membership: {}", c.summary()) + } + } + } +} + +/// A Raft log entry. +#[derive(Clone, serde::Serialize, serde::Deserialize)] +pub struct Entry { + pub log_id: LogId, + + /// This entry's payload. + #[serde(bound = "")] + pub payload: EntryPayload, +} + +impl Debug for Entry +where C::D: Debug +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Entry").field("log_id", &self.log_id).field("payload", &self.payload).finish() + } +} + +impl Default for Entry { + fn default() -> Self { + Self { + log_id: LogId::default(), + payload: EntryPayload::Blank, + } + } +} + +impl MessageSummary for Entry { + fn summary(&self) -> String { + format!("{}:{}", self.log_id, self.payload.summary()) + } +} + +impl MessageSummary for Option> { + fn summary(&self) -> String { + match self { + None => "None".to_string(), + Some(x) => format!("Some({})", x.summary()), + } + } +} + +impl MessageSummary for &[Entry] { + fn summary(&self) -> String { + let entry_refs: Vec<_> = self.iter().collect(); + entry_refs.as_slice().summary() + } +} + +impl MessageSummary for &[&Entry] { + fn summary(&self) -> String { + if self.is_empty() { + return "{}".to_string(); + } + let mut res = Vec::with_capacity(self.len()); + if self.len() <= 5 { + for x in self.iter() { + let e = format!("{}:{}", x.log_id, x.payload.summary()); + res.push(e); + } + + res.join(",") + } else { + let first = *self.first().unwrap(); + let last = *self.last().unwrap(); + + format!("{} ... {}", first.summary(), last.summary()) + } + } +} + +impl RaftPayload for EntryPayload { + fn is_blank(&self) -> bool { + matches!(self, EntryPayload::Blank) + } + + fn get_membership(&self) -> Option<&Membership> { + if let EntryPayload::Membership(m) = self { + Some(m) + } else { + None + } + } +} + +impl RaftPayload for Entry { + fn is_blank(&self) -> bool { + self.payload.is_blank() + } + + fn get_membership(&self) -> Option<&Membership> { + self.payload.get_membership() + } +} + +impl RaftEntry for Entry { + fn get_log_id(&self) -> &LogId { + &self.log_id + } + + fn set_log_id(&mut self, log_id: &LogId) { + self.log_id = *log_id; + } +} diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 77b502c77..175c8ad09 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -10,6 +10,7 @@ mod config; mod core; mod defensive; +mod entry; mod membership; mod node; mod raft_types; @@ -42,6 +43,9 @@ pub use crate::core::EffectiveMembership; pub use crate::core::State; pub use crate::defensive::DefensiveCheck; pub use crate::defensive::DefensiveCheckBase; +pub use crate::entry::Entry; +pub use crate::entry::EntryPayload; +pub use crate::entry::RaftPayload; pub use crate::membership::Membership; pub use crate::metrics::RaftMetrics; pub use crate::network::RPCTypes; diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index f4cba47dc..2869269ae 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -31,6 +31,8 @@ use crate::metrics::RaftMetrics; use crate::metrics::Wait; use crate::AppData; use crate::AppDataResponse; +use crate::Entry; +use crate::EntryPayload; use crate::LogId; use crate::Membership; use crate::MessageSummary; @@ -762,101 +764,6 @@ impl MessageSummary for AppendEntriesResponse { } } -/// A Raft log entry. -#[derive(Clone, Serialize, Deserialize)] -pub struct Entry { - pub log_id: LogId, - - /// This entry's payload. - #[serde(bound = "")] - pub payload: EntryPayload, -} - -impl Debug for Entry -where C::D: Debug -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Entry").field("log_id", &self.log_id).field("payload", &self.payload).finish() - } -} - -impl Default for Entry { - fn default() -> Self { - Self { - log_id: LogId::default(), - payload: EntryPayload::Blank, - } - } -} - -impl MessageSummary for Entry { - fn summary(&self) -> String { - format!("{}:{}", self.log_id, self.payload.summary()) - } -} - -impl MessageSummary for Option> { - fn summary(&self) -> String { - match self { - None => "None".to_string(), - Some(x) => format!("Some({})", x.summary()), - } - } -} - -impl MessageSummary for &[Entry] { - fn summary(&self) -> String { - let entry_refs: Vec<_> = self.iter().collect(); - entry_refs.as_slice().summary() - } -} - -impl MessageSummary for &[&Entry] { - fn summary(&self) -> String { - if self.is_empty() { - return "{}".to_string(); - } - let mut res = Vec::with_capacity(self.len()); - if self.len() <= 5 { - for x in self.iter() { - let e = format!("{}:{}", x.log_id, x.payload.summary()); - res.push(e); - } - - res.join(",") - } else { - let first = *self.first().unwrap(); - let last = *self.last().unwrap(); - - format!("{} ... {}", first.summary(), last.summary()) - } - } -} - -/// Log entry payload variants. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -pub enum EntryPayload { - /// An empty payload committed by a new cluster leader. - Blank, - - Normal(C::D), - - /// A change-membership log entry. - Membership(Membership), -} - -impl MessageSummary for EntryPayload { - fn summary(&self) -> String { - match self { - EntryPayload::Blank => "blank".to_string(), - EntryPayload::Normal(_n) => "normal".to_string(), - EntryPayload::Membership(c) => { - format!("membership: {}", c.summary()) - } - } - } -} - /// An RPC sent by candidates to gather votes (ยง5.2). #[derive(Debug, Serialize, Deserialize)] pub struct VoteRequest { diff --git a/openraft/src/storage.rs b/openraft/src/storage.rs index 67b0136cd..ceadd2176 100644 --- a/openraft/src/storage.rs +++ b/openraft/src/storage.rs @@ -13,10 +13,10 @@ use tokio::io::AsyncWrite; use crate::core::EffectiveMembership; use crate::defensive::check_range_matches_entries; -use crate::raft::Entry; -use crate::raft::EntryPayload; use crate::raft_types::SnapshotId; use crate::raft_types::StateMachineChanges; +use crate::Entry; +use crate::EntryPayload; use crate::LogId; use crate::LogIdOptionExt; use crate::NodeId; diff --git a/openraft/src/store_ext.rs b/openraft/src/store_ext.rs index d80e0da55..b3a1378f0 100644 --- a/openraft/src/store_ext.rs +++ b/openraft/src/store_ext.rs @@ -7,7 +7,6 @@ use std::sync::Arc; use crate::async_trait::async_trait; use crate::defensive::DefensiveCheckBase; -use crate::raft::Entry; use crate::storage::LogState; use crate::storage::RaftLogReader; use crate::storage::RaftSnapshotBuilder; @@ -15,6 +14,7 @@ use crate::storage::Snapshot; use crate::summary::MessageSummary; use crate::DefensiveCheck; use crate::EffectiveMembership; +use crate::Entry; use crate::LogId; use crate::RaftStorage; use crate::RaftStorageDebug; diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index 624db5c09..60d988e0d 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -5,8 +5,6 @@ use std::option::Option::None; use maplit::btreeset; -use crate::raft::Entry; -use crate::raft::EntryPayload; use crate::storage::InitialState; use crate::storage::LogState; use crate::testing::DefensiveStoreBuilder; @@ -15,6 +13,8 @@ use crate::AppData; use crate::AppDataResponse; use crate::DefensiveError; use crate::EffectiveMembership; +use crate::Entry; +use crate::EntryPayload; use crate::ErrorSubject; use crate::LeaderId; use crate::LogId; diff --git a/openraft/tests/append_entries/t10_conflict_with_empty_entries.rs b/openraft/tests/append_entries/t10_conflict_with_empty_entries.rs index 3c081b2eb..87cacce18 100644 --- a/openraft/tests/append_entries/t10_conflict_with_empty_entries.rs +++ b/openraft/tests/append_entries/t10_conflict_with_empty_entries.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use anyhow::Result; use memstore::ClientRequest; use openraft::raft::AppendEntriesRequest; -use openraft::raft::Entry; -use openraft::raft::EntryPayload; use openraft::Config; +use openraft::Entry; +use openraft::EntryPayload; use openraft::LeaderId; use openraft::LogId; use openraft::RaftNetwork; diff --git a/openraft/tests/append_entries/t20_append_conflicts.rs b/openraft/tests/append_entries/t20_append_conflicts.rs index d4faba507..399f771d9 100644 --- a/openraft/tests/append_entries/t20_append_conflicts.rs +++ b/openraft/tests/append_entries/t20_append_conflicts.rs @@ -4,8 +4,8 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; use openraft::raft::AppendEntriesRequest; -use openraft::raft::Entry; use openraft::Config; +use openraft::Entry; use openraft::LeaderId; use openraft::LogId; use openraft::MessageSummary; diff --git a/openraft/tests/append_entries/t30_append_inconsistent_log.rs b/openraft/tests/append_entries/t30_append_inconsistent_log.rs index 231660b8b..98133763c 100644 --- a/openraft/tests/append_entries/t30_append_inconsistent_log.rs +++ b/openraft/tests/append_entries/t30_append_inconsistent_log.rs @@ -3,9 +3,9 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; -use openraft::raft::Entry; -use openraft::raft::EntryPayload; use openraft::Config; +use openraft::Entry; +use openraft::EntryPayload; use openraft::LeaderId; use openraft::LogId; use openraft::RaftLogReader; diff --git a/openraft/tests/append_entries/t40_append_updates_membership.rs b/openraft/tests/append_entries/t40_append_updates_membership.rs index 54c1914f7..06282d6f3 100644 --- a/openraft/tests/append_entries/t40_append_updates_membership.rs +++ b/openraft/tests/append_entries/t40_append_updates_membership.rs @@ -5,9 +5,9 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; use openraft::raft::AppendEntriesRequest; -use openraft::raft::Entry; -use openraft::raft::EntryPayload; use openraft::Config; +use openraft::Entry; +use openraft::EntryPayload; use openraft::LeaderId; use openraft::LogId; use openraft::Membership; diff --git a/openraft/tests/elect_compare_last_log.rs b/openraft/tests/elect_compare_last_log.rs index 4905e397a..4d2ef19aa 100644 --- a/openraft/tests/elect_compare_last_log.rs +++ b/openraft/tests/elect_compare_last_log.rs @@ -5,9 +5,9 @@ use std::time::Duration; use anyhow::Result; use fixtures::RaftRouter; use maplit::btreeset; -use openraft::raft::Entry; -use openraft::raft::EntryPayload; use openraft::Config; +use openraft::Entry; +use openraft::EntryPayload; use openraft::LeaderId; use openraft::LogId; use openraft::Membership; diff --git a/openraft/tests/fixtures/mod.rs b/openraft/tests/fixtures/mod.rs index e2ddb5f1a..51bab22ca 100644 --- a/openraft/tests/fixtures/mod.rs +++ b/openraft/tests/fixtures/mod.rs @@ -39,8 +39,6 @@ use openraft::raft::AddLearnerResponse; use openraft::raft::AppendEntriesRequest; use openraft::raft::AppendEntriesResponse; use openraft::raft::ClientWriteRequest; -use openraft::raft::Entry; -use openraft::raft::EntryPayload; use openraft::raft::InstallSnapshotRequest; use openraft::raft::InstallSnapshotResponse; use openraft::raft::VoteRequest; @@ -49,6 +47,8 @@ use openraft::storage::RaftLogReader; use openraft::storage::RaftStorage; use openraft::Config; use openraft::DefensiveCheckBase; +use openraft::Entry; +use openraft::EntryPayload; use openraft::LeaderId; use openraft::LogId; use openraft::LogIdOptionExt; diff --git a/openraft/tests/initialization.rs b/openraft/tests/initialization.rs index e7b5139da..dd3f6ce6d 100644 --- a/openraft/tests/initialization.rs +++ b/openraft/tests/initialization.rs @@ -5,9 +5,9 @@ use std::time::Duration; use anyhow::Result; use fixtures::RaftRouter; use maplit::btreeset; -use openraft::raft::EntryPayload; use openraft::Config; use openraft::EffectiveMembership; +use openraft::EntryPayload; use openraft::LeaderId; use openraft::LogId; use openraft::Membership; diff --git a/openraft/tests/log_compaction/compaction.rs b/openraft/tests/log_compaction/compaction.rs index e57189597..4a7c0b2bf 100644 --- a/openraft/tests/log_compaction/compaction.rs +++ b/openraft/tests/log_compaction/compaction.rs @@ -5,9 +5,9 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; use openraft::raft::AppendEntriesRequest; -use openraft::raft::Entry; -use openraft::raft::EntryPayload; use openraft::Config; +use openraft::Entry; +use openraft::EntryPayload; use openraft::LeaderId; use openraft::LogId; use openraft::Membership; diff --git a/openraft/tests/membership/t99_new_leader_auto_commit_uniform_config.rs b/openraft/tests/membership/t99_new_leader_auto_commit_uniform_config.rs index b76a0ac46..4869f52a6 100644 --- a/openraft/tests/membership/t99_new_leader_auto_commit_uniform_config.rs +++ b/openraft/tests/membership/t99_new_leader_auto_commit_uniform_config.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use anyhow::Result; use maplit::btreeset; -use openraft::raft::Entry; -use openraft::raft::EntryPayload; use openraft::Config; +use openraft::Entry; +use openraft::EntryPayload; use openraft::LeaderId; use openraft::LogId; use openraft::Membership; diff --git a/openraft/tests/snapshot/snapshot_overrides_membership.rs b/openraft/tests/snapshot/snapshot_overrides_membership.rs index c0812868a..225295586 100644 --- a/openraft/tests/snapshot/snapshot_overrides_membership.rs +++ b/openraft/tests/snapshot/snapshot_overrides_membership.rs @@ -5,9 +5,9 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; use openraft::raft::AppendEntriesRequest; -use openraft::raft::Entry; -use openraft::raft::EntryPayload; use openraft::Config; +use openraft::Entry; +use openraft::EntryPayload; use openraft::LeaderId; use openraft::LogId; use openraft::Membership;