Skip to content

Commit

Permalink
change: reduce one unnecessary snapshot serialization
Browse files Browse the repository at this point in the history
- Change: `get_current_snapshot()`: remove double-serialization:
  convert MemStoreSnapshot to CurrentSnapshotData instead of serializing
  MemStoreSnapshot:

  Before:
  ```
  MemStoreSnapshot.data = serialize(state-machine)
  CurrentSnapshotData.data = serialize(MemStoreSnapshot)
  ```

  After:
  ```
  MemStoreSnapshot.data = serialize(state-machine)
  CurrentSnapshotData.data = MemStoreSnapshot.data
  ```

  when `finalize_snapshot_installation`, extract snapshot meta info from
  `InstallSnapshotRequest`. Reduce one unnecessary deserialization.

- Change: InstallSnapshotRequest: merge `snapshot_id`, `last_log_id`,
  `membership` into one field `meta`.

- Refactor: use SnapshotMeta(`snapshot_id`, `last_log_id`, `membership`) as
  a container of metadata of a snapshot.
  Reduce parameters.

- Refactor: remove redundent param `delete_through` from
  `finalize_snapshot_installation`.
  • Loading branch information
drmingdrmer committed Jul 14, 2021
1 parent dba2403 commit 0c870cc
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 116 deletions.
39 changes: 15 additions & 24 deletions async-raft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
return self.begin_installing_snapshot(req).await;
}
Some(SnapshotState::Streaming { snapshot, id, offset }) => {
if req.snapshot_id == id {
return self.continue_installing_snapshot(req, offset, id, snapshot).await;
if req.meta.snapshot_id == id {
return self.continue_installing_snapshot(req, offset, snapshot).await;
}

if req.offset == 0 {
Expand All @@ -85,7 +85,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
Err(RaftError::SnapshotMismatch {
expect: SnapshotSegmentId { id: id.clone(), offset },
got: SnapshotSegmentId {
id: req.snapshot_id.clone(),
id: req.meta.snapshot_id.clone(),
offset: req.offset,
},
})
Expand All @@ -96,19 +96,19 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
#[tracing::instrument(level = "trace", skip(self, req))]
async fn begin_installing_snapshot(&mut self, req: InstallSnapshotRequest) -> RaftResult<InstallSnapshotResponse> {
// Create a new snapshot and begin writing its contents.
let id = req.snapshot_id.clone();
let id = req.meta.snapshot_id.clone();
let mut snapshot = self.storage.create_snapshot().await.map_err(|err| self.map_fatal_storage_error(err))?;
snapshot.as_mut().write_all(&req.data).await?;

// If this was a small snapshot, and it is already done, then finish up.
if req.done {
self.finalize_snapshot_installation(req, id, snapshot).await?;
self.finalize_snapshot_installation(req, snapshot).await?;
return Ok(InstallSnapshotResponse {
term: self.current_term,
});
}

// Else, retain snapshot components for later segments & respod.
// Else, retain snapshot components for later segments & respond.
self.snapshot_state = Some(SnapshotState::Streaming {
offset: req.data.len() as u64,
id,
Expand All @@ -124,9 +124,10 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
&mut self,
req: InstallSnapshotRequest,
mut offset: u64,
id: String,
mut snapshot: Box<S::Snapshot>,
) -> RaftResult<InstallSnapshotResponse> {
let id = req.meta.snapshot_id.clone();

// Always seek to the target offset if not an exact match.
if req.offset != offset {
if let Err(err) = snapshot.as_mut().seek(SeekFrom::Start(req.offset)).await {
Expand All @@ -145,7 +146,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra

// If the snapshot stream is done, then finalize.
if req.done {
self.finalize_snapshot_installation(req, id, snapshot).await?;
self.finalize_snapshot_installation(req, snapshot).await?;
} else {
self.snapshot_state = Some(SnapshotState::Streaming { offset, id, snapshot });
}
Expand All @@ -161,30 +162,20 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
async fn finalize_snapshot_installation(
&mut self,
req: InstallSnapshotRequest,
id: String,
mut snapshot: Box<S::Snapshot>,
) -> RaftResult<()> {
snapshot.as_mut().shutdown().await.map_err(|err| self.map_fatal_storage_error(err.into()))?;
let delete_through = if self.last_log_id.index > req.last_log_id.index {
Some(req.last_log_id.index)
} else {
None
};

self.storage
.finalize_snapshot_installation(
req.last_log_id.index,
req.last_log_id.term,
delete_through,
id,
snapshot,
)
.finalize_snapshot_installation(&req.meta, snapshot)
.await
.map_err(|err| self.map_fatal_storage_error(err))?;

let membership = self.storage.get_membership_config().await.map_err(|err| self.map_fatal_storage_error(err))?;
self.update_membership(membership)?;
self.last_log_id = req.last_log_id;
self.last_applied = req.last_log_id.index;
self.snapshot_last_log_id = req.last_log_id;
self.last_log_id = req.meta.last_log_id;
self.last_applied = req.meta.last_log_id.index;
self.snapshot_last_log_id = req.meta.last_log_id;
self.report_metrics(Update::Ignore);
Ok(())
}
Expand Down
33 changes: 13 additions & 20 deletions async-raft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ use crate::error::RaftError;
use crate::error::RaftResult;
use crate::metrics::RaftMetrics;
use crate::metrics::Wait;
use crate::raft_types::SnapshotId;
use crate::AppData;
use crate::AppDataResponse;
use crate::LogId;
use crate::NodeId;
use crate::RaftNetwork;
use crate::RaftStorage;
use crate::SnapshotMeta;

struct RaftInner<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> {
tx_api: mpsc::UnboundedSender<RaftMsg<D, R>>,
Expand Down Expand Up @@ -422,21 +422,14 @@ pub struct Entry<D: AppData> {
}

impl<D: AppData> Entry<D> {
/// Create a new snapshot pointer from the given data.
///
/// ### index & term
/// The index and term of the entry being replaced by this snapshot pointer entry.
///
/// ### id
/// The ID of the associated snapshot.
///
/// ### membership
/// The cluster membership config which is contained in the snapshot, which will always be the
/// latest membership covered by the snapshot.
pub fn new_snapshot_pointer(index: u64, term: u64, id: String, membership: MembershipConfig) -> Self {
/// Create a new snapshot pointer from the given snapshot meta.
pub fn new_snapshot_pointer(meta: &SnapshotMeta) -> Self {
Entry {
log_id: LogId { term, index },
payload: EntryPayload::SnapshotPointer(EntrySnapshotPointer { id, membership }),
log_id: meta.last_log_id,
payload: EntryPayload::SnapshotPointer(EntrySnapshotPointer {
id: meta.snapshot_id.clone(),
membership: meta.membership.clone(),
}),
}
}
}
Expand Down Expand Up @@ -581,15 +574,15 @@ pub struct InstallSnapshotRequest {
pub term: u64,
/// The leader's ID. Useful in redirecting clients.
pub leader_id: u64,
/// The Id of a snapshot.
/// Every two snapshots should have different snapshot id.
pub snapshot_id: SnapshotId,
/// The snapshot replaces all log entries up through and including this log.
pub last_log_id: LogId,

/// Metadata of a snapshot: snapshot_id, last_log_ed membership etc.
pub meta: SnapshotMeta,

/// The byte offset where this chunk of data is positioned in the snapshot file.
pub offset: u64,
/// The raw bytes of the snapshot chunk, starting at `offset`.
pub data: Vec<u8>,

/// Will be `true` if this is the last chunk in the snapshot.
pub done: bool,
}
Expand Down
4 changes: 1 addition & 3 deletions async-raft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

#[tracing::instrument(level = "trace", skip(self, snapshot))]
async fn stream_snapshot(&mut self, mut snapshot: CurrentSnapshotData<S::Snapshot>) -> RaftResult<()> {
let snapshot_id = snapshot.meta.snapshot_id.clone();
let mut offset = 0;
self.core.next_index = snapshot.meta.last_log_id.index + 1;
self.core.matched = snapshot.meta.last_log_id;
Expand All @@ -842,8 +841,7 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
let req = InstallSnapshotRequest {
term: self.core.term,
leader_id: self.core.id,
snapshot_id: snapshot_id.clone(),
last_log_id: snapshot.meta.last_log_id,
meta: snapshot.meta.clone(),
offset,
data: Vec::from(&buf[..nread]),
done,
Expand Down
14 changes: 3 additions & 11 deletions async-raft/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,9 @@ where

/// Finalize the installation of a snapshot which has finished streaming from the cluster leader.
///
/// Delete all entries in the log through `delete_through`, unless `None`, in which case
/// all entries of the log are to be deleted.
/// Delete all entries in the log through `meta.last_log_id.index`.
///
/// Write a new snapshot pointer to the log at the given `index`. The snapshot pointer should be
/// Write a new snapshot pointer to the log at the given `meta.last_log_id.index`. The snapshot pointer should be
/// constructed via the `Entry::new_snapshot_pointer` constructor and the other parameters
/// provided to this method.
///
Expand All @@ -235,14 +234,7 @@ where
/// made to the snapshot.
///
/// Errors returned from this method will cause Raft to go into shutdown.
async fn finalize_snapshot_installation(
&self,
index: u64,
term: u64,
delete_through: Option<u64>,
id: String,
snapshot: Box<Self::Snapshot>,
) -> Result<()>;
async fn finalize_snapshot_installation(&self, meta: &SnapshotMeta, snapshot: Box<Self::Snapshot>) -> Result<()>;

/// Get a readable handle to the current snapshot, along with its metadata.
///
Expand Down
16 changes: 10 additions & 6 deletions async-raft/tests/api_install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::Result;
use async_raft::raft::InstallSnapshotRequest;
use async_raft::Config;
use async_raft::LogId;
use async_raft::SnapshotMeta;
use async_raft::State;
use fixtures::RaftRouter;
use maplit::hashset;
Expand Down Expand Up @@ -46,8 +47,11 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
let req0 = InstallSnapshotRequest {
term: 1,
leader_id: 0,
snapshot_id: "ss1".into(),
last_log_id: LogId { term: 1, index: 0 },
meta: SnapshotMeta {
snapshot_id: "ss1".into(),
last_log_id: LogId { term: 1, index: 0 },
membership: Default::default(),
},
offset: 0,
data: vec![1, 2, 3],
done: false,
Expand All @@ -62,7 +66,7 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
{
let mut req = req0.clone();
req.offset = 3;
req.snapshot_id = "ss2".into();
req.meta.snapshot_id = "ss2".into();
let res = n.0.install_snapshot(req).await;
assert_eq!("expect: ss1+3, got: ss2+3", res.unwrap_err().to_string());
}
Expand All @@ -71,20 +75,20 @@ async fn snapshot_ge_half_threshold() -> Result<()> {
{
let mut req = req0.clone();
req.offset = 0;
req.snapshot_id = "ss2".into();
req.meta.snapshot_id = "ss2".into();
n.0.install_snapshot(req).await?;

let mut req = req0.clone();
req.offset = 3;
req.snapshot_id = "ss2".into();
req.meta.snapshot_id = "ss2".into();
n.0.install_snapshot(req).await?;
}

tracing::info!("-- continue write with mismatched offset is allowed");
{
let mut req = req0.clone();
req.offset = 8;
req.snapshot_id = "ss2".into();
req.meta.snapshot_id = "ss2".into();
n.0.install_snapshot(req).await?;
}
Ok(())
Expand Down
Loading

0 comments on commit 0c870cc

Please sign in to comment.