Skip to content

Commit

Permalink
replace sleep() with wait_for_xxx to reduce waiting time.
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jun 1, 2021
1 parent f31fc92 commit 6665b95
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 55 deletions.
13 changes: 9 additions & 4 deletions async-raft/tests/client_reads.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
mod fixtures;

use async_raft::State;
use maplit::hashset;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::Config;
use tokio::time::sleep;

use fixtures::RaftRouter;

Expand All @@ -29,14 +29,19 @@ async fn client_reads() -> Result<()> {
router.new_raft_node(1).await;
router.new_raft_node(2).await;

let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
sleep(Duration::from_secs(10)).await;
router.wait_for_log(&hashset![0, 1, 2], want, "empty node").await?;
router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, "empty node").await?;
router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(10)).await;
want += 1;

router.wait_for_log(&hashset![0, 1, 2], want, "init leader").await?;
router.assert_stable_cluster(Some(1), Some(1)).await;

// Get the ID of the leader, and assert that client_read succeeds.
Expand Down
36 changes: 23 additions & 13 deletions async-raft/tests/client_writes.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
mod fixtures;

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::Config;
use futures::prelude::*;
use maplit::hashset;
use tokio::time::sleep;

use async_raft::Config;
use async_raft::raft::MembershipConfig;
use async_raft::State;
use fixtures::RaftRouter;

mod fixtures;

/// Client write tests.
///
/// What does this test do?
Expand All @@ -32,15 +31,23 @@ async fn client_writes() -> Result<()> {
router.new_raft_node(1).await;
router.new_raft_node(2).await;

let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
sleep(Duration::from_secs(10)).await;
router.wait_for_log(&hashset![0, 1, 2], want, "empty").await?;
router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, "empty").await?;
router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
want += 1;

router.wait_for_log(&hashset![0, 1, 2], want, "leader init log").await?;
router.wait_for_state(&hashset![0], State::Leader, "init").await?;

router.assert_stable_cluster(Some(1), Some(want)).await;


// Write a bunch of data and assert that the cluster stayes stable.
let leader = router.leader().await.expect("leader not found");
Expand All @@ -52,14 +59,17 @@ async fn client_writes() -> Result<()> {
clients.push(router.client_request_many(leader, "4", 1000));
clients.push(router.client_request_many(leader, "5", 1000));
while clients.next().await.is_some() {}
sleep(Duration::from_secs(5)).await; // Ensure enough time is given for replication (this is WAY more than enough).
router.assert_stable_cluster(Some(1), Some(6001)).await; // The extra 1 is from the leader's initial commit entry.

want = 6001;
router.wait_for_log(&hashset![0, 1, 2], want, "sync logs").await?;

router.assert_stable_cluster(Some(1), Some(want)).await; // The extra 1 is from the leader's initial commit entry.
router
.assert_storage_state(
1,
6001,
want,
Some(0),
6001,
want,
Some((
(5000..5100).into(),
1,
Expand Down
29 changes: 21 additions & 8 deletions async-raft/tests/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ mod fixtures;

use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

use anyhow::Result;
use async_raft::raft::MembershipConfig;
use async_raft::{Config, SnapshotPolicy};
use async_raft::{Config, SnapshotPolicy, State};
use maplit::hashset;
use tokio::time::sleep;

use fixtures::RaftRouter;

Expand All @@ -34,20 +34,31 @@ async fn compaction() -> Result<()> {
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;

let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
sleep(Duration::from_secs(10)).await;
router.wait_for_log(&hashset![0], want, "empty").await?;
router.wait_for_state(&hashset![0], State::NonVoter, "empty").await?;
router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(10)).await;
want += 1;

router.wait_for_log(&hashset![0], want, "init leader").await?;
router.assert_stable_cluster(Some(1), Some(1)).await;

// Send enough requests to the cluster that compaction on the node should be triggered.
router.client_request_many(0, "0", 499).await; // Puts us exactly at the configured snapshot policy threshold.
sleep(Duration::from_secs(5)).await; // Wait to ensure there is enough time for a snapshot to be built (this is way more than enough).
router.assert_stable_cluster(Some(1), Some(500)).await;
want += 499;

router.wait_for_log(&hashset![0], want, "write").await?;
router.assert_stable_cluster(Some(1), Some(want)).await;

// TODO: add snapshot info into metrics.
// Then watch metrics instead of waiting.
sleep(Duration::from_secs(10)).await;
router
.assert_storage_state(
1,
Expand All @@ -72,8 +83,10 @@ async fn compaction() -> Result<()> {
.change_membership(0, hashset![0, 1])
.await
.expect("failed to modify cluster membership");
sleep(Duration::from_secs(5)).await; // Wait to ensure metrics are updated (this is way more than enough).
router.assert_stable_cluster(Some(1), Some(502)).await; // We expect index to be 500 + 2 (joint & uniform config change entries).
want += 2; // 2 member change logs

router.wait_for_log(&hashset![0, 1], want, "add follower").await?;
router.assert_stable_cluster(Some(1), Some(want)).await; // We expect index to be 500 + 2 (joint & uniform config change entries).
let expected_snap = Some((
500.into(),
1,
Expand Down
15 changes: 10 additions & 5 deletions async-raft/tests/current_leader.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
mod fixtures;

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::Config;
use tokio::time::sleep;
use async_raft::State;
use maplit::hashset;

use fixtures::RaftRouter;

Expand All @@ -28,15 +28,20 @@ async fn current_leader() -> Result<()> {
router.new_raft_node(1).await;
router.new_raft_node(2).await;

let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
sleep(Duration::from_secs(10)).await;
router.wait_for_log(&hashset![0, 1, 2], want, "empty").await?;
router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, "empty").await?;
router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
want += 1;

router.wait_for_log(&hashset![0, 1, 2], want, "init").await?;
router.assert_stable_cluster(Some(1), Some(want)).await;

// Get the ID of the leader, and assert that current_leader succeeds.
let leader = router.leader().await.expect("leader not found");
Expand Down
18 changes: 13 additions & 5 deletions async-raft/tests/dynamic_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use anyhow::Result;
use async_raft::Config;
use async_raft::State;
use futures::stream::StreamExt;
use maplit::hashset;
use tokio::time::sleep;
Expand All @@ -31,15 +32,20 @@ async fn dynamic_membership() -> Result<()> {
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;

let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
sleep(Duration::from_secs(3)).await;
router.wait_for_log(&hashset![0], want, "empty").await?;
router.wait_for_state(&hashset![0], State::NonVoter, "empty").await?;
router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(3)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
want += 1;

router.wait_for_log(&hashset![0], want, "init").await?;
router.assert_stable_cluster(Some(1), Some(want)).await;

// Sync some new nodes.
router.new_raft_node(1).await;
Expand All @@ -57,8 +63,10 @@ async fn dynamic_membership() -> Result<()> {
}
tracing::info!("--- changing cluster config");
router.change_membership(0, hashset![0, 1, 2, 3, 4]).await?;
sleep(Duration::from_secs(5)).await;
router.assert_stable_cluster(Some(1), Some(3)).await; // Still in term 1, so leader is still node 0.
want += 2;

router.wait_for_log(&hashset![0, 1, 2, 3, 4], want, "cluster of 5 candidates").await?;
router.assert_stable_cluster(Some(1), Some(want)).await; // Still in term 1, so leader is still node 0.

// Isolate old leader and assert that a new leader takes over.
tracing::info!("--- isolating master node 0");
Expand Down
15 changes: 10 additions & 5 deletions async-raft/tests/initialization.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
mod fixtures;

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::Config;
use tokio::time::sleep;
use async_raft::State;
use maplit::hashset;

use fixtures::RaftRouter;

Expand All @@ -32,15 +32,20 @@ async fn initialization() -> Result<()> {
router.new_raft_node(1).await;
router.new_raft_node(2).await;

let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
sleep(Duration::from_secs(10)).await;
router.wait_for_log(&hashset![0, 1, 2], want, "empty").await?;
router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, "empty").await?;
router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(10)).await;
router.assert_stable_cluster(Some(1), Some(1)).await;
want += 1;

router.wait_for_log(&hashset![0, 1, 2], want, "init").await?;
router.assert_stable_cluster(Some(1), Some(want)).await;

Ok(())
}
11 changes: 9 additions & 2 deletions async-raft/tests/non_voter_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,23 @@ async fn non_voter_restart() -> Result<()> {
router.new_raft_node(0).await;
router.new_raft_node(1).await;

let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
sleep(Duration::from_secs(2)).await;
router.wait_for_log(&hashset![0, 1], want, "empty").await?;
router.wait_for_state(&hashset![0, 1], State::NonVoter, "empty").await?;
router.assert_pristine_cluster().await;

tracing::info!("--- initializing single node cluster");

router.initialize_with(0, hashset![0]).await?;
want += 1;

router.add_non_voter(0, 1).await?;
router.client_request(0, "foo", 1).await;
sleep(Duration::from_secs(2)).await;
want += 1;

router.wait_for_log(&hashset![0, 1], want, "write one log").await?;

let (node0, _sto0) = router.remove_node(0).await.unwrap();
assert_node_state(0, &node0, 1, 2, State::Leader);
Expand Down
13 changes: 9 additions & 4 deletions async-raft/tests/shutdown.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
mod fixtures;

use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Result};
use async_raft::Config;
use tokio::time::sleep;
use async_raft::State;
use maplit::hashset;

use fixtures::RaftRouter;

Expand All @@ -29,14 +29,19 @@ async fn initialization() -> Result<()> {
router.new_raft_node(1).await;
router.new_raft_node(2).await;

let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
sleep(Duration::from_secs(10)).await;
router.wait_for_log(&hashset![0, 1, 2], want, "empty").await?;
router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, "empty").await?;
router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(10)).await;
want += 1;

router.wait_for_log(&hashset![0, 1, 2], want, "init").await?;
router.assert_stable_cluster(Some(1), Some(1)).await;

tracing::info!("--- performing node shutdowns");
Expand Down
13 changes: 9 additions & 4 deletions async-raft/tests/singlenode.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
mod fixtures;

use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_raft::Config;
use tokio::time::sleep;
use async_raft::State;
use maplit::hashset;

use fixtures::RaftRouter;

Expand All @@ -29,14 +29,19 @@ async fn singlenode() -> Result<()> {
let router = Arc::new(RaftRouter::new(config.clone()));
router.new_raft_node(0).await;

let mut want = 0;

// Assert all nodes are in non-voter state & have no entries.
sleep(Duration::from_secs(10)).await;
router.wait_for_log(&hashset![0], want, "empty").await?;
router.wait_for_state(&hashset![0], State::NonVoter, "empty").await?;
router.assert_pristine_cluster().await;

// Initialize the cluster, then assert that a stable cluster was formed & held.
tracing::info!("--- initializing cluster");
router.initialize_from_single_node(0).await?;
sleep(Duration::from_secs(10)).await;
want += 1;

router.wait_for_log(&hashset![0], want, "init").await?;
router.assert_stable_cluster(Some(1), Some(1)).await;

// Write some data to the single node cluster.
Expand Down
Loading

0 comments on commit 6665b95

Please sign in to comment.