Skip to content

Commit

Permalink
Merge pull request #758 from drmingdrmer/43-refact
Browse files Browse the repository at this point in the history
Chore: remove unused Command::UpdateMembership
  • Loading branch information
drmingdrmer authored Apr 10, 2023
2 parents 27aca2d + c68a7ee commit 964047b
Show file tree
Hide file tree
Showing 14 changed files with 22 additions and 132 deletions.
3 changes: 0 additions & 3 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1463,9 +1463,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
Command::UpdateProgressMetrics { target, matching } => {
self.update_progress_metrics(target, matching);
}
Command::UpdateMembership { .. } => {
// TODO: not used
}
Command::CancelSnapshot { snapshot_meta } => {
let got = self.received_snapshot.remove(&snapshot_meta.snapshot_id);
debug_assert!(got.is_some(), "there has to be a buffered snapshot data");
Expand Down
15 changes: 0 additions & 15 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::fmt::Debug;
use std::sync::Arc;

use tokio::sync::oneshot;

Expand All @@ -13,7 +12,6 @@ use crate::raft::AppendEntriesResponse;
use crate::raft::InstallSnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::EffectiveMembership;
use crate::LogId;
use crate::MetricsChangeFlags;
use crate::Node;
Expand Down Expand Up @@ -69,17 +67,6 @@ where
/// Replicate log entries or snapshot to a target.
Replicate { target: NID, req: Inflight<NID> },

// /// Replicate a snapshot to a target.
// ReplicateSnapshot {
// target: NID,
// snapshot_last_log_id: Option<LogId<NID>>,
// },
/// Membership config changed, need to update replication streams.
UpdateMembership {
// TODO: not used yet.
membership: Arc<EffectiveMembership<NID, N>>,
},

/// Membership config changed, need to update replication streams.
/// The Runtime has to close all old replications and start new ones.
/// Because a replication stream should only report state for one membership config.
Expand Down Expand Up @@ -150,7 +137,6 @@ where
(Command::LeaderCommit { already_committed, upto, }, Command::LeaderCommit { already_committed: b_committed, upto: b_upto, }, ) => already_committed == b_committed && upto == b_upto,
(Command::FollowerCommit { already_committed, upto, }, Command::FollowerCommit { already_committed: b_committed, upto: b_upto, }, ) => already_committed == b_committed && upto == b_upto,
(Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req,
(Command::UpdateMembership { membership }, Command::UpdateMembership { membership: b }, ) => membership == b,
(Command::RebuildReplicationStreams { targets }, Command::RebuildReplicationStreams { targets: b }, ) => targets == b,
(Command::UpdateProgressMetrics { target, matching }, Command::UpdateProgressMetrics { target: b_target, matching: b_matching, }, ) => target == b_target && matching == b_matching,
(Command::SaveVote { vote }, Command::SaveVote { vote: b }) => vote == b,
Expand Down Expand Up @@ -187,7 +173,6 @@ where
Command::LeaderCommit { .. } => flags.set_data_changed(),
Command::FollowerCommit { .. } => flags.set_data_changed(),
Command::Replicate { .. } => {}
Command::UpdateMembership { .. } => flags.set_cluster_changed(),
Command::RebuildReplicationStreams { .. } => flags.set_replication_changed(),
Command::UpdateProgressMetrics { .. } => flags.set_replication_changed(),
Command::SaveVote { .. } => flags.set_data_changed(),
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,9 @@ where
tracing::debug!("update effective membership: log_id:{} {}", log_id, m.summary());

let em = EffectiveMembership::new_arc(Some(log_id), m.clone());
self.state.membership_state.append(em.clone());
self.state.membership_state.append(em);

self.output.push_command(Command::AppendEntry { entry });
self.output.push_command(Command::UpdateMembership { membership: em });

self.server_state_handler().update_server_state_if_changed();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,16 @@ fn test_follower_do_append_entries_one_membership_entry() -> anyhow::Result<()>
"not in membership, become learner"
);
assert_eq!(
vec![
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m34())),
},
Command::AppendInputEntries {
entries: vec![
//
blank_ent(3, 4),
Entry::<UTCfg> {
log_id: log_id(3, 5),
payload: EntryPayload::<UTCfg>::Membership(m34()),
},
]
},
],
vec![Command::AppendInputEntries {
entries: vec![
//
blank_ent(3, 4),
Entry::<UTCfg> {
log_id: log_id(3, 5),
payload: EntryPayload::<UTCfg>::Membership(m34()),
},
]
},],
eng.output.take_commands()
);

Expand Down Expand Up @@ -230,19 +225,14 @@ fn test_follower_do_append_entries_three_membership_entries() -> anyhow::Result<
"in membership, become follower"
);
assert_eq!(
vec![
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(4, 7)), m45())),
},
Command::AppendInputEntries {
entries: vec![
blank_ent(3, 4),
Entry::<UTCfg>::new_membership(log_id(3, 5), m01()),
Entry::<UTCfg>::new_membership(log_id(4, 6), m34()),
Entry::<UTCfg>::new_membership(log_id(4, 7), m45()),
]
},
],
vec![Command::AppendInputEntries {
entries: vec![
blank_ent(3, 4),
Entry::<UTCfg>::new_membership(log_id(3, 5), m01()),
Entry::<UTCfg>::new_membership(log_id(4, 6), m34()),
Entry::<UTCfg>::new_membership(log_id(4, 7), m45()),
]
},],
eng.output.take_commands()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,6 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> {
);
assert_eq!(
vec![
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m1234()))
},
//
Command::InstallSnapshot {
snapshot_meta: SnapshotMeta {
Expand Down Expand Up @@ -215,9 +212,6 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
assert_eq!(
vec![
Command::DeleteConflictLog { since: log_id(2, 4) },
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m1234()))
},
//
Command::InstallSnapshot {
snapshot_meta: SnapshotMeta {
Expand Down Expand Up @@ -265,9 +259,6 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> {
);
assert_eq!(
vec![
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m1234()))
},
//
Command::InstallSnapshot {
snapshot_meta: SnapshotMeta {
Expand Down
12 changes: 2 additions & 10 deletions openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ where
self.output.push_command(Command::DeleteConflictLog { since: since_log_id });

let changed = self.state.membership_state.truncate(since);
if let Some(c) = changed {
self.output.push_command(Command::UpdateMembership { membership: c });
if let Some(_c) = changed {
self.server_state_handler().update_server_state_if_changed();
}
}
Expand Down Expand Up @@ -219,10 +218,6 @@ where
"updated membership state"
);

self.output.push_command(Command::UpdateMembership {
membership: self.state.membership_state.effective().clone(),
});

self.server_state_handler().update_server_state_if_changed();
}

Expand All @@ -236,10 +231,7 @@ where
// TODO: if effective membership changes, call `update_replication()`, if a follower has replication
// streams. Now we don't have replication streams for follower, so it's ok to not call
// `update_replication()`.
let effective_changed = self.state.membership_state.update_committed(m);
if let Some(c) = effective_changed {
self.output.push_command(Command::UpdateMembership { membership: c })
}
let _effective_changed = self.state.membership_state.update_committed(m);

self.server_state_handler().update_server_state_if_changed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ fn test_truncate_logs_since_3() -> anyhow::Result<()> {
vec![
//
Command::DeleteConflictLog { since: log_id(2, 3) },
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()))
},
],
eng.output.take_commands()
);
Expand Down Expand Up @@ -186,9 +183,6 @@ fn test_truncate_logs_revert_effective_membership() -> anyhow::Result<()> {
vec![
//
Command::DeleteConflictLog { since: log_id(4, 4) },
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m01()))
},
],
eng.output.take_commands()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use maplit::btreeset;
use crate::core::ServerState;
use crate::engine::testing::UTCfg;
use crate::engine::CEngine;
use crate::engine::Command;
use crate::engine::Engine;
use crate::testing::log_id;
use crate::EffectiveMembership;
Expand Down Expand Up @@ -52,12 +51,7 @@ fn test_update_committed_membership_at_index_4() -> anyhow::Result<()> {
eng.state.membership_state
);
assert_eq!(ServerState::Learner, eng.state.server_state);
assert_eq!(
vec![Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(3, 4)), m34())),
},],
eng.output.take_commands()
);
assert_eq!(true, eng.output.take_commands().is_empty());

Ok(())
}
18 changes: 0 additions & 18 deletions openraft/src/engine/handler/leader_handler/append_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,6 @@ fn test_leader_append_entries_fast_commit_upto_membership_entry() -> anyhow::Res
already_committed: Some(log_id(0, 0)),
upto: LogId::new(CommittedLeaderId::new(3, 1), 4)
},
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(
Some(LogId::new(CommittedLeaderId::new(3, 1), 5)),
m34()
)),
},
Command::RebuildReplicationStreams {
targets: vec![(3, ProgressEntry::empty(7)), (4, ProgressEntry::empty(7))]
},
Expand Down Expand Up @@ -364,12 +358,6 @@ fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow
already_committed: Some(log_id(0, 0)),
upto: LogId::new(CommittedLeaderId::new(3, 1), 4)
},
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(
Some(LogId::new(CommittedLeaderId::new(3, 1), 5)),
m1_2()
)),
},
Command::RebuildReplicationStreams {
targets: vec![(2, ProgressEntry::empty(7))]
},
Expand Down Expand Up @@ -448,12 +436,6 @@ fn test_leader_append_entries_fast_commit_if_membership_voter_change_to_1() -> a
blank_ent(3, 6),
]
},
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(
Some(LogId::new(CommittedLeaderId::new(3, 1), 5)),
m1_2()
)),
},
Command::RebuildReplicationStreams {
targets: vec![(2, ProgressEntry::empty(7))]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ fn test_leader_append_membership_for_leader() -> anyhow::Result<()> {
assert_eq!(
vec![
//
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(3, 4)), m34())),
},
Command::RebuildReplicationStreams {
targets: vec![(3, ProgressEntry::empty(0)), (4, ProgressEntry::empty(0))], /* node-2 is leader,
* won't be removed */
Expand Down
3 changes: 0 additions & 3 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ where

self.state.membership_state.append(EffectiveMembership::new_arc(Some(*log_id), m.clone()));

let em = self.state.membership_state.effective();
self.output.push_command(Command::UpdateMembership { membership: em.clone() });

// TODO(9): currently only a leader has replication setup.
// It's better to setup replication for both leader and candidate.
// e.g.: if self.internal_server_state.is_leading() {
Expand Down
12 changes: 0 additions & 12 deletions openraft/src/engine/tests/append_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,6 @@ fn test_append_entries_prev_log_id_conflict() -> anyhow::Result<()> {
vote: Vote::new_committed(2, 1)
},
Command::DeleteConflictLog { since: log_id(1, 2) },
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()))
},
],
eng.output.take_commands()
);
Expand Down Expand Up @@ -202,9 +199,6 @@ fn test_append_entries_prev_log_id_is_committed() -> anyhow::Result<()> {
vote: Vote::new_committed(2, 1)
},
Command::DeleteConflictLog { since: log_id(1, 2) },
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()))
},
Command::AppendInputEntries {
entries: vec![blank_ent(2, 2)]
},
Expand Down Expand Up @@ -298,12 +292,6 @@ fn test_append_entries_conflict() -> anyhow::Result<()> {
vote: Vote::new_committed(2, 1)
},
Command::DeleteConflictLog { since: log_id(2, 3) },
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01()))
},
Command::UpdateMembership {
membership: Arc::new(EffectiveMembership::new(Some(log_id(3, 3)), m34()))
},
Command::AppendInputEntries {
entries: vec![Entry::new_membership(log_id(3, 3), m34())]
},
Expand Down
2 changes: 0 additions & 2 deletions openraft/src/engine/tests/command_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::progress::Inflight;
use crate::raft::VoteRequest;
use crate::raft_types::MetricsChangeFlags;
use crate::testing::log_id;
use crate::EffectiveMembership;
use crate::Entry;
use crate::Membership;
use crate::SnapshotMeta;
Expand Down Expand Up @@ -35,7 +34,6 @@ fn test_command_update_metrics_flags() -> anyhow::Result<()> {
t(Command::LeaderCommit { already_committed: None, upto: log_id(1,2) }, false, true, false);
t(Command::FollowerCommit { already_committed: None, upto: log_id(1,2) }, false, true, false);
t(Command::Replicate { target: 3, req: Inflight::None }, false, false, false);
t(Command::UpdateMembership{ membership: EffectiveMembership::new_arc(Some(log_id(1,1)), Membership::new(vec![], ()) ) }, false, false, true);
t(Command::RebuildReplicationStreams{ targets: vec![] }, true, false, false);
t(Command::UpdateProgressMetrics{ target: 0, matching: log_id(1,2), }, true, false, false);
t(Command::SaveVote{ vote: Vote::new(1,2) }, false, true, false);
Expand Down
14 changes: 0 additions & 14 deletions openraft/src/engine/tests/initialize_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ fn test_initialize_single_node() -> anyhow::Result<()> {
Command::AppendEntry {
entry: Entry::<Config>::new_membership(LogId::default(), m1())
},
Command::UpdateMembership {
membership: eng.state.membership_state.effective().clone()
},
// When update the effective membership, the engine set it to Follower.
// But when initializing, it will switch to Candidate at once, in the last output
// command.
Expand Down Expand Up @@ -145,24 +142,13 @@ fn test_initialize() -> anyhow::Result<()> {
assert_eq!(Some(&log_id0), eng.state.last_log_id());

assert_eq!(ServerState::Candidate, eng.state.server_state);
assert_eq!(
MetricsChangeFlags {
replication: false,
local_data: true,
cluster: true,
},
eng.output.metrics_flags
);
assert_eq!(&m12(), eng.state.membership_state.effective().membership());

assert_eq!(
vec![
Command::AppendEntry {
entry: Entry::new_membership(LogId::default(), m12())
},
Command::UpdateMembership {
membership: eng.state.membership_state.effective().clone()
},
// When update the effective membership, the engine set it to Follower.
// But when initializing, it will switch to Candidate at once, in the last output
// command.
Expand Down

0 comments on commit 964047b

Please sign in to comment.