Skip to content

Commit

Permalink
Feature: add trait RaftPayload RaftEntry to access payload and en…
Browse files Browse the repository at this point in the history
…try without the need to know about user data, i.e., `AppData` or `AppDataResponse`.

- With `RaftPayload` and `RaftEntry`, the protocol can be implememnted
  in a standalone mod, without depending on type of user data.
  I.e. the protocol mod does not need to know about `AppData`, or
  `RaftStorage<RaftTypeConfig>`.

  This will make test easier to write.

- Refactor: move `Entry`, `EntryPayload`, `RaftPayload`, `RaftEntry`
  into file `entry.rs`.
  • Loading branch information
drmingdrmer committed Apr 9, 2022
1 parent 060a9d1 commit 6f20e1f
Show file tree
Hide file tree
Showing 24 changed files with 194 additions and 131 deletions.
2 changes: 1 addition & 1 deletion example-raft-kv/src/network/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_web::Responder;
use openraft::error::CheckIsLeaderError;
use openraft::error::Infallible;
use openraft::raft::ClientWriteRequest;
use openraft::raft::EntryPayload;
use openraft::EntryPayload;
use web::Json;

use crate::app::ExampleApp;
Expand Down
4 changes: 2 additions & 2 deletions example-raft-kv/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ use std::sync::Arc;
use std::sync::Mutex;

use openraft::async_trait::async_trait;
use openraft::raft::Entry;
use openraft::raft::EntryPayload;
use openraft::storage::LogState;
use openraft::storage::Snapshot;
use openraft::AnyError;
use openraft::EffectiveMembership;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::ErrorSubject;
use openraft::ErrorVerb;
use openraft::LogId;
Expand Down
4 changes: 2 additions & 2 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use std::sync::Arc;
use std::sync::Mutex;

use openraft::async_trait::async_trait;
use openraft::raft::Entry;
use openraft::raft::EntryPayload;
use openraft::storage::LogState;
use openraft::storage::RaftLogReader;
use openraft::storage::RaftSnapshotBuilder;
use openraft::storage::Snapshot;
use openraft::AnyError;
use openraft::EffectiveMembership;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::ErrorSubject;
use openraft::ErrorVerb;
use openraft::LogId;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ use crate::metrics::RemoveTarget;
use crate::raft::AddLearnerResponse;
use crate::raft::ChangeMembers;
use crate::raft::ClientWriteResponse;
use crate::raft::EntryPayload;
use crate::raft::RaftRespTx;
use crate::raft_types::LogIdOptionExt;
use crate::versioned::Updatable;
use crate::EntryPayload;
use crate::LogId;
use crate::Membership;
use crate::Node;
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use crate::core::State;
use crate::error::AppendEntriesError;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::raft_types::LogIdOptionExt;
use crate::EffectiveMembership;
use crate::Entry;
use crate::EntryPayload;
use crate::LogId;
use crate::MessageSummary;
use crate::RaftNetworkFactory;
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ClientWriteRequest;
use crate::raft::ClientWriteResponse;
use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::raft::RaftRespTx;
use crate::replication::RaftEvent;
use crate::Entry;
use crate::EntryPayload;
use crate::MessageSummary;
use crate::RPCTypes;
use crate::RaftNetwork;
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ use crate::error::NotAllowed;
use crate::metrics::RaftMetrics;
use crate::metrics::ReplicationMetrics;
use crate::raft::AddLearnerResponse;
use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::raft::RaftMsg;
use crate::raft::RaftRespTx;
use crate::raft::VoteResponse;
Expand All @@ -56,6 +54,8 @@ use crate::replication::ReplicationStream;
use crate::storage::RaftSnapshotBuilder;
use crate::versioned::Versioned;
use crate::vote::Vote;
use crate::Entry;
use crate::EntryPayload;
use crate::LogId;
use crate::Membership;
use crate::MessageSummary;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/defensive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use std::ops::RangeBounds;

use async_trait::async_trait;

use crate::raft::Entry;
use crate::raft_types::LogIdOptionExt;
use crate::DefensiveError;
use crate::Entry;
use crate::ErrorSubject;
use crate::LogId;
use crate::RaftStorage;
Expand Down
152 changes: 152 additions & 0 deletions openraft/src/entry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
use std::fmt::Debug;

use crate::LogId;
use crate::Membership;
use crate::MessageSummary;
use crate::NodeId;
use crate::RaftTypeConfig;

/// Defines operations on an entry payload.
pub trait RaftPayload<NID: NodeId> {
/// Return `Some(())` if the entry payload is blank.
fn is_blank(&self) -> bool;

/// Return `Some(&Membership)` if the entry payload is a membership payload.
fn get_membership(&self) -> Option<&Membership<NID>>;
}

/// Defines operations on an entry.
pub trait RaftEntry<NID: NodeId>: RaftPayload<NID> {
fn get_log_id(&self) -> &LogId<NID>;

fn set_log_id(&mut self, log_id: &LogId<NID>);
}

/// Log entry payload variants.
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum EntryPayload<C: RaftTypeConfig> {
/// An empty payload committed by a new cluster leader.
Blank,

Normal(C::D),

/// A change-membership log entry.
Membership(Membership<C::NodeId>),
}

impl<C: RaftTypeConfig> MessageSummary for EntryPayload<C> {
fn summary(&self) -> String {
match self {
EntryPayload::Blank => "blank".to_string(),
EntryPayload::Normal(_n) => "normal".to_string(),
EntryPayload::Membership(c) => {
format!("membership: {}", c.summary())
}
}
}
}

/// A Raft log entry.
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct Entry<C: RaftTypeConfig> {
pub log_id: LogId<C::NodeId>,

/// This entry's payload.
#[serde(bound = "")]
pub payload: EntryPayload<C>,
}

impl<C: RaftTypeConfig> Debug for Entry<C>
where C::D: Debug
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Entry").field("log_id", &self.log_id).field("payload", &self.payload).finish()
}
}

impl<C: RaftTypeConfig> Default for Entry<C> {
fn default() -> Self {
Self {
log_id: LogId::default(),
payload: EntryPayload::Blank,
}
}
}

impl<C: RaftTypeConfig> MessageSummary for Entry<C> {
fn summary(&self) -> String {
format!("{}:{}", self.log_id, self.payload.summary())
}
}

impl<C: RaftTypeConfig> MessageSummary for Option<Entry<C>> {
fn summary(&self) -> String {
match self {
None => "None".to_string(),
Some(x) => format!("Some({})", x.summary()),
}
}
}

impl<C: RaftTypeConfig> MessageSummary for &[Entry<C>] {
fn summary(&self) -> String {
let entry_refs: Vec<_> = self.iter().collect();
entry_refs.as_slice().summary()
}
}

impl<C: RaftTypeConfig> MessageSummary for &[&Entry<C>] {
fn summary(&self) -> String {
if self.is_empty() {
return "{}".to_string();
}
let mut res = Vec::with_capacity(self.len());
if self.len() <= 5 {
for x in self.iter() {
let e = format!("{}:{}", x.log_id, x.payload.summary());
res.push(e);
}

res.join(",")
} else {
let first = *self.first().unwrap();
let last = *self.last().unwrap();

format!("{} ... {}", first.summary(), last.summary())
}
}
}

impl<C: RaftTypeConfig> RaftPayload<C::NodeId> for EntryPayload<C> {
fn is_blank(&self) -> bool {
matches!(self, EntryPayload::Blank)
}

fn get_membership(&self) -> Option<&Membership<C::NodeId>> {
if let EntryPayload::Membership(m) = self {
Some(m)
} else {
None
}
}
}

impl<C: RaftTypeConfig> RaftPayload<C::NodeId> for Entry<C> {
fn is_blank(&self) -> bool {
self.payload.is_blank()
}

fn get_membership(&self) -> Option<&Membership<C::NodeId>> {
self.payload.get_membership()
}
}

impl<C: RaftTypeConfig> RaftEntry<C::NodeId> for Entry<C> {
fn get_log_id(&self) -> &LogId<C::NodeId> {
&self.log_id
}

fn set_log_id(&mut self, log_id: &LogId<C::NodeId>) {
self.log_id = *log_id;
}
}
4 changes: 4 additions & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
mod config;
mod core;
mod defensive;
mod entry;
mod membership;
mod node;
mod raft_types;
Expand Down Expand Up @@ -42,6 +43,9 @@ pub use crate::core::EffectiveMembership;
pub use crate::core::State;
pub use crate::defensive::DefensiveCheck;
pub use crate::defensive::DefensiveCheckBase;
pub use crate::entry::Entry;
pub use crate::entry::EntryPayload;
pub use crate::entry::RaftPayload;
pub use crate::membership::Membership;
pub use crate::metrics::RaftMetrics;
pub use crate::network::RPCTypes;
Expand Down
97 changes: 2 additions & 95 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::AppData;
use crate::AppDataResponse;
use crate::Entry;
use crate::EntryPayload;
use crate::LogId;
use crate::Membership;
use crate::MessageSummary;
Expand Down Expand Up @@ -762,101 +764,6 @@ impl<NID: NodeId> MessageSummary for AppendEntriesResponse<NID> {
}
}

/// A Raft log entry.
#[derive(Clone, Serialize, Deserialize)]
pub struct Entry<C: RaftTypeConfig> {
pub log_id: LogId<C::NodeId>,

/// This entry's payload.
#[serde(bound = "")]
pub payload: EntryPayload<C>,
}

impl<C: RaftTypeConfig> Debug for Entry<C>
where C::D: Debug
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Entry").field("log_id", &self.log_id).field("payload", &self.payload).finish()
}
}

impl<C: RaftTypeConfig> Default for Entry<C> {
fn default() -> Self {
Self {
log_id: LogId::default(),
payload: EntryPayload::Blank,
}
}
}

impl<C: RaftTypeConfig> MessageSummary for Entry<C> {
fn summary(&self) -> String {
format!("{}:{}", self.log_id, self.payload.summary())
}
}

impl<C: RaftTypeConfig> MessageSummary for Option<Entry<C>> {
fn summary(&self) -> String {
match self {
None => "None".to_string(),
Some(x) => format!("Some({})", x.summary()),
}
}
}

impl<C: RaftTypeConfig> MessageSummary for &[Entry<C>] {
fn summary(&self) -> String {
let entry_refs: Vec<_> = self.iter().collect();
entry_refs.as_slice().summary()
}
}

impl<C: RaftTypeConfig> MessageSummary for &[&Entry<C>] {
fn summary(&self) -> String {
if self.is_empty() {
return "{}".to_string();
}
let mut res = Vec::with_capacity(self.len());
if self.len() <= 5 {
for x in self.iter() {
let e = format!("{}:{}", x.log_id, x.payload.summary());
res.push(e);
}

res.join(",")
} else {
let first = *self.first().unwrap();
let last = *self.last().unwrap();

format!("{} ... {}", first.summary(), last.summary())
}
}
}

/// Log entry payload variants.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum EntryPayload<C: RaftTypeConfig> {
/// An empty payload committed by a new cluster leader.
Blank,

Normal(C::D),

/// A change-membership log entry.
Membership(Membership<C::NodeId>),
}

impl<C: RaftTypeConfig> MessageSummary for EntryPayload<C> {
fn summary(&self) -> String {
match self {
EntryPayload::Blank => "blank".to_string(),
EntryPayload::Normal(_n) => "normal".to_string(),
EntryPayload::Membership(c) => {
format!("membership: {}", c.summary())
}
}
}
}

/// An RPC sent by candidates to gather votes (§5.2).
#[derive(Debug, Serialize, Deserialize)]
pub struct VoteRequest<C: RaftTypeConfig> {
Expand Down
Loading

0 comments on commit 6f20e1f

Please sign in to comment.