Skip to content

Commit

Permalink
Change: RaftStorage: use EffectiveMembership instead of Option<_>
Browse files Browse the repository at this point in the history
The value of membership config of an uninitialized raft-node should be:
```rust
EffectiveMembership {
    log_id: None,
    membership: Membership::default(),
}

```

I.e., there is always a membership config for either an initialized or
an uninitialized raft-node.
Because `EffectiveMembership` is not a value that could be `None`, but it is more like a container of values.

Changed struct:

```rust
struct InitialState {
    pub last_membership: EffectiveMembership<C>,
    ...
}
```

Changed methods:

```rust
RaftStorage::get_membership() -> Result<EffectiveMembership<C>, _>;
RaftStorage::last_applied_state() -> Result<(_, EffectiveMembership<C>), _>;
```
  • Loading branch information
drmingdrmer committed Apr 5, 2022
1 parent 81cd344 commit 67375a2
Show file tree
Hide file tree
Showing 13 changed files with 30 additions and 59 deletions.
13 changes: 4 additions & 9 deletions example-raft-kv/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct ExampleStateMachine {
pub last_applied_log: Option<LogId<ExampleNodeId>>,

// TODO: it should not be Option.
pub last_membership: Option<EffectiveMembership<ExampleTypeConfig>>,
pub last_membership: EffectiveMembership<ExampleTypeConfig>,

/// Application data.
pub data: BTreeMap<String, String>,
Expand Down Expand Up @@ -252,13 +252,8 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {

async fn last_applied_state(
&mut self,
) -> Result<
(
Option<LogId<ExampleNodeId>>,
Option<EffectiveMembership<ExampleTypeConfig>>,
),
StorageError<ExampleNodeId>,
> {
) -> Result<(Option<LogId<ExampleNodeId>>, EffectiveMembership<ExampleTypeConfig>), StorageError<ExampleNodeId>>
{
let state_machine = self.state_machine.read().await;
Ok((state_machine.last_applied_log, state_machine.last_membership.clone()))
}
Expand Down Expand Up @@ -288,7 +283,7 @@ impl RaftStorage<ExampleTypeConfig> for Arc<ExampleStore> {
}
},
EntryPayload::Membership(ref mem) => {
sm.last_membership = Some(EffectiveMembership::new(Some(entry.log_id), mem.clone()));
sm.last_membership = EffectiveMembership::new(Some(entry.log_id), mem.clone());
res.push(ExampleResponse { value: None })
}
};
Expand Down
6 changes: 3 additions & 3 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub struct MemStoreSnapshot {
pub struct MemStoreStateMachine {
pub last_applied_log: Option<LogId<MemNodeId>>,

pub last_membership: Option<EffectiveMembership<Config>>,
pub last_membership: EffectiveMembership<Config>,

/// A mapping of client IDs to their state info.
pub client_serial_responses: HashMap<String, (u64, Option<String>)>,
Expand Down Expand Up @@ -263,7 +263,7 @@ impl RaftStorage<Config> for Arc<MemStore> {

async fn last_applied_state(
&mut self,
) -> Result<(Option<LogId<MemNodeId>>, Option<EffectiveMembership<Config>>), StorageError<MemNodeId>> {
) -> Result<(Option<LogId<MemNodeId>>, EffectiveMembership<Config>), StorageError<MemNodeId>> {
let sm = self.sm.read().await;
Ok((sm.last_applied_log, sm.last_membership.clone()))
}
Expand Down Expand Up @@ -343,7 +343,7 @@ impl RaftStorage<Config> for Arc<MemStore> {
res.push(ClientResponse(previous));
}
EntryPayload::Membership(ref mem) => {
sm.last_membership = Some(EffectiveMembership::new(Some(entry.log_id), mem.clone()));
sm.last_membership = EffectiveMembership::new(Some(entry.log_id), mem.clone());
res.push(ClientResponse(None))
}
};
Expand Down
3 changes: 0 additions & 3 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// initialized. Because a node always commit a membership log as the first log entry.
let membership = self.storage.get_membership().await?;

// NOTE: the first log, i.e. log at index 0 can also conflict with the leader and is removed.
let membership = membership.unwrap_or_default();

self.update_membership(membership);

tracing::debug!("Done update membership");
Expand Down
4 changes: 0 additions & 4 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
let membership = self.storage.get_membership().await?;
tracing::debug!("storage membership: {:?}", membership);

assert!(membership.is_some());

let membership = membership.unwrap();

self.update_membership(membership);

self.snapshot_last_log_id = self.last_applied;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,

self.last_log_id = state.last_log_id;
self.vote = state.vote;
self.effective_membership = Arc::new(state.last_membership.unwrap_or_default());
self.effective_membership = Arc::new(state.last_membership);
self.last_applied = state.last_applied;

// NOTE: The commit index must be determined by a leader after
Expand Down
15 changes: 6 additions & 9 deletions openraft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub struct InitialState<C: RaftTypeConfig> {

/// The latest cluster membership configuration found, in log or in state machine, else a new initial
/// membership config consisting only of this node's ID.
pub last_membership: Option<EffectiveMembership<C>>,
pub last_membership: EffectiveMembership<C>,
}

/// The state about logs.
Expand Down Expand Up @@ -193,18 +193,15 @@ where C: RaftTypeConfig
/// TODO(xp): if there is no membership log, return `{log_id:None, membership: vec![]}`.
/// The raft core should always have an effective_membership.
/// Initially, it is empty.
async fn get_membership(&mut self) -> Result<Option<EffectiveMembership<C>>, StorageError<C::NodeId>> {
async fn get_membership(&mut self) -> Result<EffectiveMembership<C>, StorageError<C::NodeId>> {
let (_, sm_mem) = self.last_applied_state().await?;

let sm_mem_next_index = match &sm_mem {
None => 0,
Some(mem) => mem.log_id.next_index(),
};
let sm_mem_next_index = sm_mem.log_id.next_index();

let log_mem = self.last_membership_in_log(sm_mem_next_index).await?;

if log_mem.is_some() {
return Ok(log_mem);
if let Some(x) = log_mem {
return Ok(x);
}

return Ok(sm_mem);
Expand Down Expand Up @@ -311,7 +308,7 @@ where C: RaftTypeConfig
// NOTE: This can be made into sync, provided all state machines will use atomic read or the like.
async fn last_applied_state(
&mut self,
) -> Result<(Option<LogId<C::NodeId>>, Option<EffectiveMembership<C>>), StorageError<C::NodeId>>;
) -> Result<(Option<LogId<C::NodeId>>, EffectiveMembership<C>), StorageError<C::NodeId>>;

/// Apply the given payload of entries to the state machine.
///
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/store_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ where
#[tracing::instrument(level = "trace", skip(self))]
async fn last_applied_state(
&mut self,
) -> Result<(Option<LogId<C::NodeId>>, Option<EffectiveMembership<C>>), StorageError<C::NodeId>> {
) -> Result<(Option<LogId<C::NodeId>>, EffectiveMembership<C>), StorageError<C::NodeId>> {
self.inner().last_applied_state().await
}

Expand Down
23 changes: 9 additions & 14 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ where

let membership = store.get_membership().await?;

assert!(membership.is_none());
assert_eq!(EffectiveMembership::default(), membership);

Ok(())
}
Expand All @@ -218,7 +218,6 @@ where
.await?;

let mem = store.get_membership().await?;
let mem = mem.unwrap();

assert_eq!(Membership::new(vec![btreeset! {3,4,5}], None), mem.membership,);
}
Expand All @@ -234,8 +233,6 @@ where

let mem = store.get_membership().await?;

let mem = mem.unwrap();

assert_eq!(Membership::new(vec![btreeset! {3, 4, 5}], None), mem.membership,);
}

Expand All @@ -250,8 +247,6 @@ where

let mem = store.get_membership().await?;

let mem = mem.unwrap();

assert_eq!(Membership::new(vec![btreeset! {7,8,9}], None), mem.membership,);
}

Expand Down Expand Up @@ -335,7 +330,7 @@ where

assert_eq!(
Membership::new(vec![btreeset! {3,4,5}], None),
initial.last_membership.unwrap().membership,
initial.last_membership.membership,
);
}

Expand All @@ -352,7 +347,7 @@ where

assert_eq!(
Membership::new(vec![btreeset! {3,4,5}], None),
initial.last_membership.unwrap().membership,
initial.last_membership.membership,
);
}

Expand All @@ -369,7 +364,7 @@ where

assert_eq!(
Membership::new(vec![btreeset! {1,2,3}], None),
initial.last_membership.unwrap().membership,
initial.last_membership.membership,
);
}

Expand Down Expand Up @@ -623,7 +618,7 @@ where

let (applied, membership) = store.last_applied_state().await?;
assert_eq!(None, applied);
assert_eq!(None, membership);
assert_eq!(EffectiveMembership::default(), membership);

tracing::info!("--- with last_applied and last_membership");
{
Expand All @@ -637,10 +632,10 @@ where
let (applied, membership) = store.last_applied_state().await?;
assert_eq!(Some(LogId::new(LeaderId::new(1, NODE_ID.into()), 3)), applied);
assert_eq!(
Some(EffectiveMembership::new(
EffectiveMembership::new(
Some(LogId::new(LeaderId::new(1, NODE_ID.into()), 3)),
Membership::new(vec![btreeset! {1,2}], None)
)),
),
membership
);
}
Expand All @@ -657,10 +652,10 @@ where
let (applied, membership) = store.last_applied_state().await?;
assert_eq!(Some(LogId::new(LeaderId::new(1, NODE_ID.into()), 5)), applied);
assert_eq!(
Some(EffectiveMembership::new(
EffectiveMembership::new(
Some(LogId::new(LeaderId::new(1, NODE_ID.into()), 3)),
Membership::new(vec![btreeset! {1,2}], None)
)),
),
membership
);
}
Expand Down
4 changes: 2 additions & 2 deletions openraft/tests/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ async fn initialization() -> Result<()> {

let sm_mem = sto.last_applied_state().await?.1;
assert_eq!(
Some(EffectiveMembership::new(
EffectiveMembership::new(
Some(LogId::new(LeaderId::new(0, 0), 0)),
Membership::new(vec![btreeset! {0,1,2}], None)
)),
),
sm_mem
);
}
Expand Down
1 change: 0 additions & 1 deletion openraft/tests/membership/t10_add_learner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ async fn check_learner_after_leader_transfered() -> Result<()> {
{
let mut sto = router.get_storage_handle(&1)?;
let m = sto.get_membership().await?;
let m = m.unwrap();

assert_eq!(
Membership::new(vec![btreeset! {1,3,4}], Some(btreeset! {2})),
Expand Down
4 changes: 0 additions & 4 deletions openraft/tests/snapshot/snapshot_overrides_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ async fn snapshot_overrides_membership() -> Result<()> {
{
let m = sto.get_membership().await?;

let m = m.unwrap();

assert_eq!(Membership::new(vec![btreeset! {2,3}], None), m.membership);
}
}
Expand Down Expand Up @@ -133,8 +131,6 @@ async fn snapshot_overrides_membership() -> Result<()> {

let m = sto.get_membership().await?;

let m = m.unwrap();

assert_eq!(
Membership::new(vec![btreeset! {0}], Some(btreeset! {1})),
m.membership,
Expand Down
4 changes: 0 additions & 4 deletions openraft/tests/snapshot/snapshot_uses_prev_snap_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
}
let m = sto0.get_membership().await?;

let m = m.unwrap();

assert_eq!(
Membership::new(vec![btreeset! {0,1}], None),
m.membership,
Expand Down Expand Up @@ -125,8 +123,6 @@ async fn snapshot_uses_prev_snap_membership() -> Result<()> {
}
let m = sto0.get_membership().await?;

let m = m.unwrap();

assert_eq!(
Membership::new(vec![btreeset! {0,1}], None),
m.membership,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ async fn state_machine_apply_membership() -> Result<()> {
for i in 0..=0 {
let mut sto = router.get_storage_handle(&i)?;
assert_eq!(
Some(EffectiveMembership::new(
EffectiveMembership::new(
Some(LogId::new(LeaderId::new(0, 0), 0)),
Membership::new(vec![btreeset! {0}], None)
)),
),
sto.last_applied_state().await?.1
);
}
Expand Down Expand Up @@ -101,10 +101,10 @@ async fn state_machine_apply_membership() -> Result<()> {
let mut sto = router.get_storage_handle(&i)?;
let (_, last_membership) = sto.last_applied_state().await?;
assert_eq!(
Some(EffectiveMembership::new(
EffectiveMembership::new(
Some(LogId::new(LeaderId::new(1, 0), log_index)),
Membership::new(vec![btreeset! {0, 1, 2}], Some(btreeset! {3,4}))
)),
),
last_membership
);
}
Expand Down

0 comments on commit 67375a2

Please sign in to comment.