Skip to content

Commit

Permalink
Change: RaftMetrics.replication type to `BTreeMap<NodeId, Option<Lo…
Browse files Browse the repository at this point in the history
…gId>>`

The `RaftMetrics.replication` used to be of type
`ReplicationMetrics{ replication: BTreeMap<NodeId, ReplicationTargetMetrics> }`
which contained an atomic log index value for each
ReplicationTargetMetrics stored in the `BTreeMap`. The purpose of this
type was to reduce the cost of copying a metrics instance. However,
since the metrics report rate has been significantly reduced, this
cost is now negligible. As a result, these complicated implementations
have been removed. When reporting metrics, they can simply be cloned
from the progress information maintained by `Engine`.

### Upgrade tip

Replace usage of `RaftMetrics.replication.data().replication.get(node_id)` with
`RaftMetrics.replication.get(node_id)`.
  • Loading branch information
drmingdrmer committed Apr 24, 2023
1 parent 0bdf5b7 commit 9f8ae43
Show file tree
Hide file tree
Showing 15 changed files with 39 additions and 495 deletions.
41 changes: 8 additions & 33 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::fmt::Display;
Expand Down Expand Up @@ -49,7 +50,6 @@ use crate::log_id::LogIdOptionExt;
use crate::log_id::RaftLogId;
use crate::metrics::RaftMetrics;
use crate::metrics::ReplicationMetrics;
use crate::metrics::UpdateMatchedLogId;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::progress::Progress;
Expand Down Expand Up @@ -79,8 +79,6 @@ use crate::storage::LogFlushed;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::versioned::Updatable;
use crate::versioned::Versioned;
use crate::ChangeMembers;
use crate::LogId;
use crate::Membership;
Expand Down Expand Up @@ -141,9 +139,6 @@ where SD: AsyncRead + AsyncSeek + Send + Unpin + 'static
// It requires the Engine to emit correct add/remove replication commands
pub(super) replications: BTreeMap<C::NodeId, ReplicationHandle<C::NodeId, C::Node, SD>>,

/// The metrics of all replication streams
pub(crate) replication_metrics: Versioned<ReplicationMetrics<C::NodeId>>,

/// The time to send next heartbeat.
pub(crate) next_heartbeat: Instant,
}
Expand All @@ -155,7 +150,6 @@ where SD: AsyncRead + AsyncSeek + Send + Unpin + 'static
Self {
client_resp_channels: Default::default(),
replications: BTreeMap::new(),
replication_metrics: Versioned::new(ReplicationMetrics::default()),
next_heartbeat: Instant::now(),
}
}
Expand Down Expand Up @@ -490,17 +484,20 @@ where
true
}

/// Flush cached changes of metrics to notify metrics watchers with updated metrics.
/// Then clear flags about the cached changes, to avoid unnecessary metrics report.
#[tracing::instrument(level = "debug", skip_all)]
pub fn flush_metrics(&mut self) {
let leader_metrics = self.leader_data.as_ref().map(|x| x.replication_metrics.clone());
let leader_metrics = if let Some(leader) = self.engine.internal_server_state.leading() {
let prog = &leader.progress;
Some(prog.iter().map(|(id, p)| (*id, *p.borrow())).collect())
} else {
None
};
self.report_metrics(leader_metrics);
}

/// Report a metrics payload on the current state of the Raft node.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn report_metrics(&self, replication: Option<Versioned<ReplicationMetrics<C::NodeId>>>) {
pub(crate) fn report_metrics(&self, replication: Option<ReplicationMetrics<C::NodeId>>) {
let m = RaftMetrics {
running_state: Ok(()),
id: self.id,
Expand Down Expand Up @@ -842,8 +839,6 @@ where
let _x = handle.await;
tracing::info!("Done joining removed replication : {}", target);
}

l.replication_metrics = Versioned::new(ReplicationMetrics::default());
} else {
unreachable!("it has to be a leader!!!");
};
Expand Down Expand Up @@ -1327,23 +1322,6 @@ where
}
}

#[tracing::instrument(level = "debug", skip_all)]
fn update_progress_metrics(&mut self, target: C::NodeId, matching: LogId<C::NodeId>) {
tracing::debug!(%target, ?matching, "update_leader_metrics");

if let Some(l) = &mut self.leader_data {
tracing::debug!(
target = display(target),
matching = debug(&matching),
"update replication_metrics"
);
l.replication_metrics.update(UpdateMatchedLogId { target, matching });
} else {
// This method is only called after `update_progress()`.
// And this node may become a non-leader after `update_progress()`
}
}

/// If a message is sent by a previous server state but is received by current server state,
/// it is a stale message and should be just ignored.
fn does_vote_match(&self, vote: &Vote<C::NodeId>, msg: impl Display) -> bool {
Expand Down Expand Up @@ -1526,9 +1504,6 @@ where
}
}
}
Command::UpdateProgressMetrics { target, matching } => {
self.update_progress_metrics(target, matching);
}
Command::CancelSnapshot { snapshot_meta } => {
let cmd = sm::Command::cancel_snapshot(snapshot_meta.clone());
self.sm_handle
Expand Down
12 changes: 0 additions & 12 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,6 @@ where C: RaftTypeConfig
targets: Vec<(C::NodeId, ProgressEntry<C::NodeId>)>,
},

// TODO(3): it also update the progress of a leader.
// Add doc:
// `target` can also be the leader id.
/// As the state of replication to `target` is updated, the metrics should be updated.
UpdateProgressMetrics {
target: C::NodeId,
matching: LogId<C::NodeId>,
},

/// Save vote to storage
SaveVote { vote: Vote<C::NodeId> },

Expand Down Expand Up @@ -146,7 +137,6 @@ where
(Command::FollowerCommit { already_committed, upto, }, Command::FollowerCommit { already_committed: b_committed, upto: b_upto, }, ) => already_committed == b_committed && upto == b_upto,
(Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req,
(Command::RebuildReplicationStreams { targets }, Command::RebuildReplicationStreams { targets: b }, ) => targets == b,
(Command::UpdateProgressMetrics { target, matching }, Command::UpdateProgressMetrics { target: b_target, matching: b_matching, }, ) => target == b_target && matching == b_matching,
(Command::SaveVote { vote }, Command::SaveVote { vote: b }) => vote == b,
(Command::SendVote { vote_req }, Command::SendVote { vote_req: b }, ) => vote_req == b,
(Command::PurgeLog { upto }, Command::PurgeLog { upto: b }) => upto == b,
Expand Down Expand Up @@ -177,7 +167,6 @@ where C: RaftTypeConfig
Command::FollowerCommit { .. } => CommandKind::StateMachine,
Command::Replicate { .. } => CommandKind::Network,
Command::RebuildReplicationStreams { .. } => CommandKind::Other,
Command::UpdateProgressMetrics { .. } => CommandKind::Other,
Command::SaveVote { .. } => CommandKind::Log,
Command::SendVote { .. } => CommandKind::Network,
Command::PurgeLog { .. } => CommandKind::Log,
Expand All @@ -204,7 +193,6 @@ where C: RaftTypeConfig
Command::FollowerCommit { .. } => None,
Command::Replicate { .. } => None,
Command::RebuildReplicationStreams { .. } => None,
Command::UpdateProgressMetrics { .. } => None,
Command::SaveVote { .. } => None,
Command::SendVote { .. } => None,
Command::PurgeLog { .. } => None,
Expand Down
8 changes: 0 additions & 8 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,6 @@ where C: RaftTypeConfig

tracing::debug!(granted = display(granted.summary()), "granted after updating progress");

if node_id != self.config.id {
// TODO(3): replication metrics should also contains leader's progress
self.output.push_command(Command::UpdateProgressMetrics {
target: node_id,
matching: log_id.unwrap(),
});
}

self.try_commit_granted(granted);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,7 @@ fn test_update_matching() -> anyhow::Result<()> {
{
rh.update_matching(3, inflight_id_3, Some(log_id(1, 2)));
assert_eq!(None, rh.state.committed());
assert_eq!(
vec![
//
Command::UpdateProgressMetrics {
target: 3,
matching: log_id(1, 2),
},
],
rh.output.take_commands()
);
assert_eq!(0, rh.output.take_commands().len());
}

// progress: None, (2,1), (1,2); quorum-ed: (1,2), not at leader vote, not committed
Expand All @@ -106,10 +97,6 @@ fn test_update_matching() -> anyhow::Result<()> {
assert_eq!(Some(&log_id(2, 1)), rh.state.committed());
assert_eq!(
vec![
Command::UpdateProgressMetrics {
target: 3,
matching: log_id(2, 3),
},
Command::ReplicateCommitted {
committed: Some(log_id(2, 1))
},
Expand All @@ -129,10 +116,6 @@ fn test_update_matching() -> anyhow::Result<()> {
assert_eq!(Some(&log_id(2, 3)), rh.state.committed());
assert_eq!(
vec![
Command::UpdateProgressMetrics {
target: 1,
matching: log_id(2, 4),
},
Command::ReplicateCommitted {
committed: Some(log_id(2, 3))
},
Expand Down
2 changes: 0 additions & 2 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ pub mod raft;
pub mod storage;
pub mod testing;
pub mod timer;
pub mod versioned;

pub(crate) mod engine;
pub(crate) mod log_id_range;
Expand Down Expand Up @@ -95,7 +94,6 @@ pub use crate::membership::EffectiveMembership;
pub use crate::membership::Membership;
pub use crate::membership::StoredMembership;
pub use crate::metrics::RaftMetrics;
pub use crate::metrics::ReplicationTargetMetrics;
pub use crate::network::RPCTypes;
pub use crate::network::RaftNetwork;
pub use crate::network::RaftNetworkFactory;
Expand Down
11 changes: 6 additions & 5 deletions openraft/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
//! return a stream of metrics.
mod raft_metrics;
mod replication_metrics;
mod wait;

#[cfg(test)] mod replication_metrics_test;
#[cfg(test)] mod wait_test;

use std::collections::BTreeMap;

pub use raft_metrics::RaftMetrics;
pub use replication_metrics::ReplicationMetrics;
pub use replication_metrics::ReplicationTargetMetrics;
pub(crate) use replication_metrics::UpdateMatchedLogId;
pub use wait::Wait;
pub use wait::WaitError;

use crate::LogId;

pub(crate) type ReplicationMetrics<NID> = BTreeMap<NID, Option<LogId<NID>>>;
11 changes: 6 additions & 5 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::error::Fatal;
use crate::metrics::ReplicationMetrics;
use crate::node::Node;
use crate::summary::MessageSummary;
use crate::versioned::Versioned;
use crate::LogId;
use crate::NodeId;
use crate::StoredMembership;
Expand Down Expand Up @@ -54,8 +53,8 @@ where
// ---
// --- replication ---
// ---
/// The metrics about the leader. It is Some() only when this node is leader.
pub replication: Option<Versioned<ReplicationMetrics<NID>>>,
/// The replication states. It is Some() only when this node is leader.
pub replication: Option<ReplicationMetrics<NID>>,
}

impl<NID, N> MessageSummary<RaftMetrics<NID, N>> for RaftMetrics<NID, N>
Expand All @@ -64,7 +63,7 @@ where
N: Node,
{
fn summary(&self) -> String {
format!("Metrics{{id:{},{:?}, term:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, replication:{}",
format!("Metrics{{id:{},{:?}, term:{}, last_log:{:?}, last_applied:{:?}, leader:{:?}, membership:{}, snapshot:{:?}, replication:{{{}}}",
self.id,
self.state,
self.current_term,
Expand All @@ -73,7 +72,9 @@ where
self.current_leader,
self.membership_config.summary(),
self.snapshot,
self.replication.as_ref().map(|x| x.summary()).unwrap_or_default(),
self.replication.as_ref().map(|x| {
x.iter().map(|(k, v)| format!("{}:{}", k, v.summary())).collect::<Vec<_>>().join(",")
}).unwrap_or_default(),
)
}
}
Expand Down
Loading

0 comments on commit 9f8ae43

Please sign in to comment.