Skip to content

Commit

Permalink
Fix: a leader should report leader metrics with value Update::AsIs
Browse files Browse the repository at this point in the history
…instead of `Update::Update(None)`.

- Refactor: rename methods and variables to reflect their purposes.

- Refactor: move metircs flags operations from `RaftCore` to
  `MetricsChangeFlags`.

- Refactor: do not make a method `pub` if possible.

- Refacttr: `MetricsOptionUpdater` do not need to build and reurn an
  `Update<T>` but just a `T` is quite enough.
  • Loading branch information
drmingdrmer committed Apr 1, 2022
1 parent 6409602 commit efdc321
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 77 deletions.
4 changes: 2 additions & 2 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
let payload = EntryPayload::Membership(mem.clone());
let entry = self.core.append_payload_to_log(payload).await?;

self.leader_report_metrics();
self.set_leader_metrics_changed();

let cr_entry = ClientRequestEntry {
entry: Arc::new(entry),
Expand Down Expand Up @@ -325,7 +325,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
self.try_remove_replication(target);
}

self.leader_report_metrics();
self.set_leader_metrics_changed();
}

/// Remove a replication if the membership that does not include it has committed.
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/append_entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.set_target_state(State::Follower); // State update will emit metrics.
}

self.update_other_metrics_option();
self.metrics_flags.set_changed_other();
}

// Caveat: [commit-index must not advance the last known consistent log](https://datafuselabs.github.io/openraft/replication.html#caveat-commit-index-must-not-advance-the-last-known-consistent-log)
Expand Down Expand Up @@ -340,7 +340,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.committed,
self.last_applied
);
self.update_other_metrics_option();
self.metrics_flags.set_changed_other();
return Ok(());
}

Expand All @@ -360,7 +360,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.last_applied = Some(last_log_id);

self.trigger_log_compaction_if_needed(false).await;
self.update_other_metrics_option();
self.metrics_flags.set_changed_other();
Ok(())
}
}
8 changes: 4 additions & 4 deletions openraft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
pub(super) async fn commit_initial_leader_entry(&mut self) -> Result<(), StorageError<C>> {
let entry = self.core.append_payload_to_log(EntryPayload::Blank).await?;

self.leader_report_metrics();
self.set_leader_metrics_changed();

let cr_entry = ClientRequestEntry {
entry: Arc::new(entry),
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
tx: Some(tx),
};

self.leader_report_metrics();
self.set_leader_metrics_changed();

self.replicate_client_request(entry).await?;
Ok(())
Expand All @@ -224,7 +224,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
self.core.committed = Some(log_id);
tracing::debug!(?self.core.committed, "update committed, no need to replicate");

self.leader_report_metrics();
self.set_leader_metrics_changed();
self.client_request_post_commit(req).await?;
} else {
self.awaiting_committed.push(req);
Expand Down Expand Up @@ -352,7 +352,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

// TODO(xp): deal with partial apply.
self.core.last_applied = Some(*log_id);
self.leader_report_metrics();
self.set_leader_metrics_changed();

// TODO(xp) merge this function to replication_to_state_machine?

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.set_target_state(State::Follower); // State update will emit metrics.
}

self.update_other_metrics_option();
self.metrics_flags.set_changed_other();
}

// Compare current snapshot state with received RPC and handle as needed.
Expand Down Expand Up @@ -239,7 +239,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
self.update_membership(membership);

self.snapshot_last_log_id = self.last_applied;
self.update_other_metrics_option();
self.metrics_flags.set_changed_other();

Ok(())
}
Expand Down
89 changes: 36 additions & 53 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ use crate::LeaderId;
use crate::LogId;
use crate::Membership;
use crate::MessageSummary;
use crate::MetricsChangeFlags;
use crate::Node;
use crate::NodeId;
use crate::RaftNetworkFactory;
use crate::RaftStorage;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Update;
use crate::UpdateMetricsOption;

/// The currently active membership config.
///
Expand Down Expand Up @@ -150,10 +150,10 @@ impl<C: RaftTypeConfig> MessageSummary for EffectiveMembership<C> {
}
}

pub trait MetricsOptionUpdater<NID: NodeId> {
// the default impl of `get_leader_metrics_option`, for the non-leader state
fn get_leader_metrics_option(&self, _option: Update<()>) -> Update<Option<Versioned<LeaderMetrics<NID>>>> {
Update::Update(None)
pub trait MetricsProvider<NID: NodeId> {
/// The default impl for the non-leader state
fn get_leader_metrics(&self) -> Option<&Versioned<LeaderMetrics<NID>>> {
None
}
}

Expand Down Expand Up @@ -205,7 +205,7 @@ pub struct RaftCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<
last_heartbeat: Option<Instant>,

/// Options of update the metrics
update_metrics: UpdateMetricsOption,
metrics_flags: MetricsChangeFlags,

/// The duration until the next election timeout.
next_election_timeout: Option<Instant>,
Expand Down Expand Up @@ -250,7 +250,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
snapshot_state: None,
snapshot_last_log_id: None,
last_heartbeat: None,
update_metrics: UpdateMetricsOption::default(),
metrics_flags: MetricsChangeFlags::default(),
next_election_timeout: None,

tx_compaction,
Expand Down Expand Up @@ -305,7 +305,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
// Fetch the most recent snapshot in the system.
if let Some(snapshot) = self.storage.get_current_snapshot().await? {
self.snapshot_last_log_id = Some(snapshot.meta.last_log_id);
self.update_other_metrics_option();
self.metrics_flags.set_changed_other();
}

let has_log = if self.last_log_id.is_some() {
Expand Down Expand Up @@ -381,33 +381,19 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

pub fn update_leader_metrics_option(&mut self) {
self.update_metrics.leader = Update::Update(());
}

pub fn update_other_metrics_option(&mut self) {
self.update_metrics.other_metrics = Update::Update(());
}

pub fn reset_update_metrics(&mut self) {
// reset update_metrics
self.update_metrics = UpdateMetricsOption::default();
}

#[tracing::instrument(level = "trace", skip(self, metrics_reporter))]
pub fn check_report_metrics(&self, metrics_reporter: &impl MetricsOptionUpdater<C::NodeId>) {
let update_metrics = self.update_metrics.clone();
let leader_metrics = metrics_reporter.get_leader_metrics_option(update_metrics.leader);
let other_metrics = update_metrics.other_metrics;

match leader_metrics {
Update::AsIs => {
if other_metrics == Update::Update(()) {
self.report_metrics(Update::Update(None));
}
}
Update::Update(u) => self.report_metrics(Update::Update(u)),
pub fn report_metrics_if_needed(&self, metrics_reporter: &impl MetricsProvider<C::NodeId>) {
if !self.metrics_flags.changed() {
return;
}

let leader_metrics = if self.metrics_flags.leader {
Update::Update(metrics_reporter.get_leader_metrics().cloned())
} else {
Update::AsIs
};

self.report_metrics(leader_metrics);
}

/// Report a metrics payload on the current state of the Raft node.
Expand Down Expand Up @@ -524,7 +510,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
fn update_snapshot_state(&mut self, update: SnapshotUpdate<C>) {
if let SnapshotUpdate::SnapshotComplete(log_id) = update {
self.snapshot_last_log_id = Some(log_id);
self.update_other_metrics_option();
self.metrics_flags.set_changed_other();
}
// If snapshot state is anything other than streaming, then drop it.
if let Some(state @ SnapshotState::Streaming { .. }) = self.snapshot_state.take() {
Expand Down Expand Up @@ -813,14 +799,11 @@ struct LeaderState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStora
pub(super) awaiting_committed: Vec<ClientRequestEntry<C>>,
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsOptionUpdater<C::NodeId>
impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsProvider<C::NodeId>
for LeaderState<'a, C, N, S>
{
fn get_leader_metrics_option(&self, option: Update<()>) -> Update<Option<Versioned<LeaderMetrics<C::NodeId>>>> {
match option {
Update::AsIs => Update::AsIs,
Update::Update(_) => Update::Update(Some(self.leader_metrics.clone())),
}
fn get_leader_metrics(&self) -> Option<&Versioned<LeaderMetrics<C::NodeId>>> {
Some(&self.leader_metrics)
}
}

Expand Down Expand Up @@ -885,8 +868,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
let span = tracing::debug_span!("CHrx:LeaderState");
let _ent = span.enter();

self.core.check_report_metrics(&self);
self.core.reset_update_metrics();
self.core.report_metrics_if_needed(&self);
self.core.metrics_flags.reset();

tokio::select! {
Some((msg,span)) = self.core.rx_api.recv() => {
Expand Down Expand Up @@ -955,8 +938,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
}

/// Report metrics with leader specific states.
pub fn leader_report_metrics(&mut self) {
self.core.update_leader_metrics_option();
pub fn set_leader_metrics_changed(&mut self) {
self.core.metrics_flags.set_changed_leader();
}
}

Expand Down Expand Up @@ -1006,7 +989,7 @@ struct CandidateState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftSt
granted: BTreeSet<C::NodeId>,
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsOptionUpdater<C::NodeId>
impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsProvider<C::NodeId>
for CandidateState<'a, C, N, S>
{
// the non-leader state use the default impl of `get_leader_metrics_option`
Expand Down Expand Up @@ -1037,8 +1020,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Candida
return Ok(());
}

self.core.check_report_metrics(&self);
self.core.reset_update_metrics();
self.core.report_metrics_if_needed(&self);
self.core.metrics_flags.reset();

// Setup new term.
self.core.update_next_election_timeout(false); // Generates a new rand value within range.
Expand Down Expand Up @@ -1137,7 +1120,7 @@ pub struct FollowerState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: Raf
core: &'a mut RaftCore<C, N, S>,
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsOptionUpdater<C::NodeId>
impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsProvider<C::NodeId>
for FollowerState<'a, C, N, S>
{
// the non-leader state use the default impl of `get_leader_metrics_option`
Expand Down Expand Up @@ -1165,8 +1148,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Followe
return Ok(());
}

self.core.check_report_metrics(&self);
self.core.reset_update_metrics();
self.core.report_metrics_if_needed(&self);
self.core.metrics_flags.reset();

let election_timeout = sleep_until(self.core.get_next_election_timeout()); // Value is updated as heartbeats are received.

Expand Down Expand Up @@ -1232,7 +1215,7 @@ pub struct LearnerState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: Raft
core: &'a mut RaftCore<C, N, S>,
}

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsOptionUpdater<C::NodeId>
impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> MetricsProvider<C::NodeId>
for LearnerState<'a, C, N, S>
{
// the non-leader state use the default impl of `get_leader_metrics_option`
Expand All @@ -1259,8 +1242,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Learner
return Ok(());
}

self.core.check_report_metrics(&self);
self.core.reset_update_metrics();
self.core.report_metrics_if_needed(&self);
self.core.metrics_flags.reset();

let span = tracing::debug_span!("CHrx:LearnerState");
let _ent = span.enter();
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
}

if Some(matched) <= self.core.committed {
self.leader_report_metrics();
self.set_leader_metrics_changed();
return Ok(());
}

Expand Down Expand Up @@ -171,7 +171,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
}

// TODO(xp): does this update too frequently?
self.leader_report_metrics();
self.set_leader_metrics_changed();

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ pub use crate::raft::Raft;
pub use crate::raft::RaftTypeConfig;
pub use crate::raft_types::LogId;
pub use crate::raft_types::LogIdOptionExt;
pub(crate) use crate::raft_types::MetricsChangeFlags;
pub use crate::raft_types::SnapshotId;
pub use crate::raft_types::SnapshotSegmentId;
pub use crate::raft_types::StateMachineChanges;
pub use crate::raft_types::Update;
pub use crate::raft_types::UpdateMetricsOption;
pub use crate::replication::ReplicationMetrics;
pub use crate::storage::RaftLogReader;
pub use crate::storage::RaftSnapshotBuilder;
Expand Down
31 changes: 21 additions & 10 deletions openraft/src/raft_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,29 @@ pub enum Update<T> {
AsIs,
}

#[derive(Debug, Clone, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
pub struct UpdateMetricsOption {
pub leader: Update<()>,
pub other_metrics: Update<()>,
/// Describes the need to update some aspect of the metrics.
#[derive(Debug, Clone, Default)]
pub(crate) struct MetricsChangeFlags {
pub leader: bool,
pub other_metrics: bool,
}

impl Default for UpdateMetricsOption {
fn default() -> Self {
UpdateMetricsOption {
leader: Update::AsIs,
other_metrics: Update::AsIs,
}
impl MetricsChangeFlags {
pub(crate) fn changed(&self) -> bool {
self.leader || self.other_metrics
}

pub(crate) fn reset(&mut self) {
self.leader = false;
self.other_metrics = false;
}

pub(crate) fn set_changed_leader(&mut self) {
self.leader = true
}

pub(crate) fn set_changed_other(&mut self) {
self.other_metrics = true
}
}

Expand Down

0 comments on commit efdc321

Please sign in to comment.