Skip to content

Commit

Permalink
Feature: adding a snapshot finalize timeout config
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-schoenberger committed Nov 9, 2022
1 parent 6cac90c commit 07a2a67
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
4 changes: 4 additions & 0 deletions openraft/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ pub struct Config {
#[clap(long, default_value = "200")]
pub install_snapshot_timeout: u64,

/// The timeout for sending the last snapshot segment, in seconds
#[clap(long, default_value = "300")]
pub finalize_snapshot_timeout: u64,

/// The maximum number of entries per payload allowed to be transmitted during replication
///
/// If this is too low, it will take longer for the nodes to be brought up to
Expand Down
1 change: 1 addition & 0 deletions openraft/src/core/install_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
} else {
unreachable!("It has to be Streaming")
};

if stream_changed {
self.begin_installing_snapshot(&req).await?;
}
Expand Down
13 changes: 12 additions & 1 deletion openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ pub(crate) struct ReplicationCore<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S
/// The timeout for sending snapshot segment.
install_snapshot_timeout: Duration,

/// The timeout for sending the last snapshot segment. This is when finalize is called, which can often be longer.
finalize_snapshot_timeout: Duration,

/// if or not need to replicate log entries or states, e.g., `commit_index` etc.
need_to_replicate: bool,
}
Expand All @@ -149,6 +152,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
// other component to ReplicationStream
let (repl_tx, repl_rx) = mpsc::unbounded_channel();
let install_snapshot_timeout = Duration::from_millis(config.install_snapshot_timeout);
let finalize_snapshot_timeout = Duration::from_millis(config.finalize_snapshot_timeout);

let this = Self {
target,
Expand All @@ -164,6 +168,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
raft_core_tx,
repl_rx,
install_snapshot_timeout,
finalize_snapshot_timeout,
need_to_replicate: true,
};

Expand Down Expand Up @@ -751,7 +756,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
"sending snapshot chunk"
);

let res = timeout(self.install_snapshot_timeout, self.network.send_install_snapshot(req)).await;
let snap_timeout = if done {
self.install_snapshot_timeout
} else {
self.finalize_snapshot_timeout
};

let res = timeout(snap_timeout, self.network.send_install_snapshot(req)).await;

let res = match res {
Ok(outer_res) => match outer_res {
Expand Down

0 comments on commit 07a2a67

Please sign in to comment.