diff --git a/async-raft/tests/client_reads.rs b/async-raft/tests/client_reads.rs index 3503db531..98d5accb7 100644 --- a/async-raft/tests/client_reads.rs +++ b/async-raft/tests/client_reads.rs @@ -1,11 +1,11 @@ mod fixtures; +use async_raft::State; +use maplit::hashset; use std::sync::Arc; -use std::time::Duration; use anyhow::Result; use async_raft::Config; -use tokio::time::sleep; use fixtures::RaftRouter; @@ -29,14 +29,19 @@ async fn client_reads() -> Result<()> { router.new_raft_node(1).await; router.new_raft_node(2).await; + let mut want = 0; + // Assert all nodes are in non-voter state & have no entries. - sleep(Duration::from_secs(10)).await; + router.wait_for_log(&hashset![0, 1, 2], want, "empty node").await?; + router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, "empty node").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. tracing::info!("--- initializing cluster"); router.initialize_from_single_node(0).await?; - sleep(Duration::from_secs(10)).await; + want += 1; + + router.wait_for_log(&hashset![0, 1, 2], want, "init leader").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Get the ID of the leader, and assert that client_read succeeds. diff --git a/async-raft/tests/client_writes.rs b/async-raft/tests/client_writes.rs index 9541702c9..20e353e72 100644 --- a/async-raft/tests/client_writes.rs +++ b/async-raft/tests/client_writes.rs @@ -1,17 +1,16 @@ -mod fixtures; - use std::sync::Arc; -use std::time::Duration; use anyhow::Result; -use async_raft::raft::MembershipConfig; -use async_raft::Config; use futures::prelude::*; use maplit::hashset; -use tokio::time::sleep; +use async_raft::Config; +use async_raft::raft::MembershipConfig; +use async_raft::State; use fixtures::RaftRouter; +mod fixtures; + /// Client write tests. /// /// What does this test do? @@ -32,15 +31,23 @@ async fn client_writes() -> Result<()> { router.new_raft_node(1).await; router.new_raft_node(2).await; + let mut want = 0; + // Assert all nodes are in non-voter state & have no entries. - sleep(Duration::from_secs(10)).await; + router.wait_for_log(&hashset![0, 1, 2], want, "empty").await?; + router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. tracing::info!("--- initializing cluster"); router.initialize_from_single_node(0).await?; - sleep(Duration::from_secs(10)).await; - router.assert_stable_cluster(Some(1), Some(1)).await; + want += 1; + + router.wait_for_log(&hashset![0, 1, 2], want, "leader init log").await?; + router.wait_for_state(&hashset![0], State::Leader, "init").await?; + + router.assert_stable_cluster(Some(1), Some(want)).await; + // Write a bunch of data and assert that the cluster stayes stable. let leader = router.leader().await.expect("leader not found"); @@ -52,14 +59,17 @@ async fn client_writes() -> Result<()> { clients.push(router.client_request_many(leader, "4", 1000)); clients.push(router.client_request_many(leader, "5", 1000)); while clients.next().await.is_some() {} - sleep(Duration::from_secs(5)).await; // Ensure enough time is given for replication (this is WAY more than enough). - router.assert_stable_cluster(Some(1), Some(6001)).await; // The extra 1 is from the leader's initial commit entry. + + want = 6001; + router.wait_for_log(&hashset![0, 1, 2], want, "sync logs").await?; + + router.assert_stable_cluster(Some(1), Some(want)).await; // The extra 1 is from the leader's initial commit entry. router .assert_storage_state( 1, - 6001, + want, Some(0), - 6001, + want, Some(( (5000..5100).into(), 1, diff --git a/async-raft/tests/compaction.rs b/async-raft/tests/compaction.rs index b72d0b059..b94d2b169 100644 --- a/async-raft/tests/compaction.rs +++ b/async-raft/tests/compaction.rs @@ -2,12 +2,12 @@ mod fixtures; use std::sync::Arc; use std::time::Duration; +use tokio::time::sleep; use anyhow::Result; use async_raft::raft::MembershipConfig; -use async_raft::{Config, SnapshotPolicy}; +use async_raft::{Config, SnapshotPolicy, State}; use maplit::hashset; -use tokio::time::sleep; use fixtures::RaftRouter; @@ -34,20 +34,31 @@ async fn compaction() -> Result<()> { let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; + let mut want = 0; + // Assert all nodes are in non-voter state & have no entries. - sleep(Duration::from_secs(10)).await; + router.wait_for_log(&hashset![0], want, "empty").await?; + router.wait_for_state(&hashset![0], State::NonVoter, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. tracing::info!("--- initializing cluster"); router.initialize_from_single_node(0).await?; - sleep(Duration::from_secs(10)).await; + want += 1; + + router.wait_for_log(&hashset![0], want, "init leader").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Send enough requests to the cluster that compaction on the node should be triggered. router.client_request_many(0, "0", 499).await; // Puts us exactly at the configured snapshot policy threshold. - sleep(Duration::from_secs(5)).await; // Wait to ensure there is enough time for a snapshot to be built (this is way more than enough). - router.assert_stable_cluster(Some(1), Some(500)).await; + want += 499; + + router.wait_for_log(&hashset![0], want, "write").await?; + router.assert_stable_cluster(Some(1), Some(want)).await; + + // TODO: add snapshot info into metrics. + // Then watch metrics instead of waiting. + sleep(Duration::from_secs(10)).await; router .assert_storage_state( 1, @@ -72,8 +83,10 @@ async fn compaction() -> Result<()> { .change_membership(0, hashset![0, 1]) .await .expect("failed to modify cluster membership"); - sleep(Duration::from_secs(5)).await; // Wait to ensure metrics are updated (this is way more than enough). - router.assert_stable_cluster(Some(1), Some(502)).await; // We expect index to be 500 + 2 (joint & uniform config change entries). + want += 2; // 2 member change logs + + router.wait_for_log(&hashset![0, 1], want, "add follower").await?; + router.assert_stable_cluster(Some(1), Some(want)).await; // We expect index to be 500 + 2 (joint & uniform config change entries). let expected_snap = Some(( 500.into(), 1, diff --git a/async-raft/tests/current_leader.rs b/async-raft/tests/current_leader.rs index 0050110e4..78cc4e1ed 100644 --- a/async-raft/tests/current_leader.rs +++ b/async-raft/tests/current_leader.rs @@ -1,11 +1,11 @@ mod fixtures; use std::sync::Arc; -use std::time::Duration; use anyhow::Result; use async_raft::Config; -use tokio::time::sleep; +use async_raft::State; +use maplit::hashset; use fixtures::RaftRouter; @@ -28,15 +28,20 @@ async fn current_leader() -> Result<()> { router.new_raft_node(1).await; router.new_raft_node(2).await; + let mut want = 0; + // Assert all nodes are in non-voter state & have no entries. - sleep(Duration::from_secs(10)).await; + router.wait_for_log(&hashset![0, 1, 2], want, "empty").await?; + router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. tracing::info!("--- initializing cluster"); router.initialize_from_single_node(0).await?; - sleep(Duration::from_secs(10)).await; - router.assert_stable_cluster(Some(1), Some(1)).await; + want += 1; + + router.wait_for_log(&hashset![0, 1, 2], want, "init").await?; + router.assert_stable_cluster(Some(1), Some(want)).await; // Get the ID of the leader, and assert that current_leader succeeds. let leader = router.leader().await.expect("leader not found"); diff --git a/async-raft/tests/dynamic_membership.rs b/async-raft/tests/dynamic_membership.rs index 439da2b66..a6e576873 100644 --- a/async-raft/tests/dynamic_membership.rs +++ b/async-raft/tests/dynamic_membership.rs @@ -5,6 +5,7 @@ use std::time::Duration; use anyhow::Result; use async_raft::Config; +use async_raft::State; use futures::stream::StreamExt; use maplit::hashset; use tokio::time::sleep; @@ -31,15 +32,20 @@ async fn dynamic_membership() -> Result<()> { let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; + let mut want = 0; + // Assert all nodes are in non-voter state & have no entries. - sleep(Duration::from_secs(3)).await; + router.wait_for_log(&hashset![0], want, "empty").await?; + router.wait_for_state(&hashset![0], State::NonVoter, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. tracing::info!("--- initializing cluster"); router.initialize_from_single_node(0).await?; - sleep(Duration::from_secs(3)).await; - router.assert_stable_cluster(Some(1), Some(1)).await; + want += 1; + + router.wait_for_log(&hashset![0], want, "init").await?; + router.assert_stable_cluster(Some(1), Some(want)).await; // Sync some new nodes. router.new_raft_node(1).await; @@ -57,8 +63,10 @@ async fn dynamic_membership() -> Result<()> { } tracing::info!("--- changing cluster config"); router.change_membership(0, hashset![0, 1, 2, 3, 4]).await?; - sleep(Duration::from_secs(5)).await; - router.assert_stable_cluster(Some(1), Some(3)).await; // Still in term 1, so leader is still node 0. + want += 2; + + router.wait_for_log(&hashset![0, 1, 2, 3, 4], want, "cluster of 5 candidates").await?; + router.assert_stable_cluster(Some(1), Some(want)).await; // Still in term 1, so leader is still node 0. // Isolate old leader and assert that a new leader takes over. tracing::info!("--- isolating master node 0"); diff --git a/async-raft/tests/initialization.rs b/async-raft/tests/initialization.rs index 604526227..15ba913e6 100644 --- a/async-raft/tests/initialization.rs +++ b/async-raft/tests/initialization.rs @@ -1,11 +1,11 @@ mod fixtures; use std::sync::Arc; -use std::time::Duration; use anyhow::Result; use async_raft::Config; -use tokio::time::sleep; +use async_raft::State; +use maplit::hashset; use fixtures::RaftRouter; @@ -32,15 +32,20 @@ async fn initialization() -> Result<()> { router.new_raft_node(1).await; router.new_raft_node(2).await; + let mut want = 0; + // Assert all nodes are in non-voter state & have no entries. - sleep(Duration::from_secs(10)).await; + router.wait_for_log(&hashset![0, 1, 2], want, "empty").await?; + router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. tracing::info!("--- initializing cluster"); router.initialize_from_single_node(0).await?; - sleep(Duration::from_secs(10)).await; - router.assert_stable_cluster(Some(1), Some(1)).await; + want += 1; + + router.wait_for_log(&hashset![0, 1, 2], want, "init").await?; + router.assert_stable_cluster(Some(1), Some(want)).await; Ok(()) } diff --git a/async-raft/tests/non_voter_restart.rs b/async-raft/tests/non_voter_restart.rs index 8e46763e3..12049573f 100644 --- a/async-raft/tests/non_voter_restart.rs +++ b/async-raft/tests/non_voter_restart.rs @@ -35,16 +35,23 @@ async fn non_voter_restart() -> Result<()> { router.new_raft_node(0).await; router.new_raft_node(1).await; + let mut want = 0; + // Assert all nodes are in non-voter state & have no entries. - sleep(Duration::from_secs(2)).await; + router.wait_for_log(&hashset![0, 1], want, "empty").await?; + router.wait_for_state(&hashset![0, 1], State::NonVoter, "empty").await?; router.assert_pristine_cluster().await; tracing::info!("--- initializing single node cluster"); router.initialize_with(0, hashset![0]).await?; + want += 1; + router.add_non_voter(0, 1).await?; router.client_request(0, "foo", 1).await; - sleep(Duration::from_secs(2)).await; + want += 1; + + router.wait_for_log(&hashset![0, 1], want, "write one log").await?; let (node0, _sto0) = router.remove_node(0).await.unwrap(); assert_node_state(0, &node0, 1, 2, State::Leader); diff --git a/async-raft/tests/shutdown.rs b/async-raft/tests/shutdown.rs index d80b85e6d..ce93af6fd 100644 --- a/async-raft/tests/shutdown.rs +++ b/async-raft/tests/shutdown.rs @@ -1,11 +1,11 @@ mod fixtures; use std::sync::Arc; -use std::time::Duration; use anyhow::{anyhow, Result}; use async_raft::Config; -use tokio::time::sleep; +use async_raft::State; +use maplit::hashset; use fixtures::RaftRouter; @@ -29,14 +29,19 @@ async fn initialization() -> Result<()> { router.new_raft_node(1).await; router.new_raft_node(2).await; + let mut want = 0; + // Assert all nodes are in non-voter state & have no entries. - sleep(Duration::from_secs(10)).await; + router.wait_for_log(&hashset![0, 1, 2], want, "empty").await?; + router.wait_for_state(&hashset![0, 1, 2], State::NonVoter, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. tracing::info!("--- initializing cluster"); router.initialize_from_single_node(0).await?; - sleep(Duration::from_secs(10)).await; + want += 1; + + router.wait_for_log(&hashset![0, 1, 2], want, "init").await?; router.assert_stable_cluster(Some(1), Some(1)).await; tracing::info!("--- performing node shutdowns"); diff --git a/async-raft/tests/singlenode.rs b/async-raft/tests/singlenode.rs index f4142c27a..3b07e1024 100644 --- a/async-raft/tests/singlenode.rs +++ b/async-raft/tests/singlenode.rs @@ -1,11 +1,11 @@ mod fixtures; use std::sync::Arc; -use std::time::Duration; use anyhow::Result; use async_raft::Config; -use tokio::time::sleep; +use async_raft::State; +use maplit::hashset; use fixtures::RaftRouter; @@ -29,14 +29,19 @@ async fn singlenode() -> Result<()> { let router = Arc::new(RaftRouter::new(config.clone())); router.new_raft_node(0).await; + let mut want = 0; + // Assert all nodes are in non-voter state & have no entries. - sleep(Duration::from_secs(10)).await; + router.wait_for_log(&hashset![0], want, "empty").await?; + router.wait_for_state(&hashset![0], State::NonVoter, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. tracing::info!("--- initializing cluster"); router.initialize_from_single_node(0).await?; - sleep(Duration::from_secs(10)).await; + want += 1; + + router.wait_for_log(&hashset![0], want, "init").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Write some data to the single node cluster. diff --git a/async-raft/tests/stepdown.rs b/async-raft/tests/stepdown.rs index 15b95c53d..e519e3786 100644 --- a/async-raft/tests/stepdown.rs +++ b/async-raft/tests/stepdown.rs @@ -30,14 +30,19 @@ async fn stepdown() -> Result<()> { router.new_raft_node(0).await; router.new_raft_node(1).await; + let mut want = 0; + // Assert all nodes are in non-voter state & have no entries. - sleep(Duration::from_secs(3)).await; + router.wait_for_log(&hashset![0, 1], want, "empty").await?; + router.wait_for_state(&hashset![0, 1], State::NonVoter, "empty").await?; router.assert_pristine_cluster().await; // Initialize the cluster, then assert that a stable cluster was formed & held. tracing::info!("--- initializing cluster"); router.initialize_from_single_node(0).await?; - sleep(Duration::from_secs(3)).await; + want += 1; + + router.wait_for_log(&hashset![0, 1], want, "init").await?; router.assert_stable_cluster(Some(1), Some(1)).await; // Submit a config change which adds two new nodes and removes the current leader. @@ -46,7 +51,21 @@ async fn stepdown() -> Result<()> { router.new_raft_node(2).await; router.new_raft_node(3).await; router.change_membership(orig_leader, hashset![1, 2, 3]).await?; - sleep(Duration::from_secs(5)).await; // Give time for step down metrics to flow through. + want += 2; + + for id in 0..4 { + if id == orig_leader { + router.wait_for_log(&hashset![id], want, "update membership: 1, 2, 3; old leader").await?; + } else { + // a new leader elected and propose a log + router + .wait_for_log(&hashset![id], want + 1, "update membership: 1, 2, 3; new candidate") + .await?; + } + } + + // leader commit a new log. + want += 1; // Assert on the state of the old leader. { @@ -85,8 +104,18 @@ async fn stepdown() -> Result<()> { // Assert that the current cluster is stable. let _ = router.remove_node(0).await; sleep(Duration::from_secs(5)).await; // Give time for a new leader to be elected. - router.assert_stable_cluster(Some(2), Some(4)).await; - router.assert_storage_state(2, 4, None, 0, None).await; + + // All metrics should be identical. Just use the first one. + let metrics = &router.latest_metrics().await[0]; + + // It may take more than one round to establish a leader. + // As leader established it commits a Blank log. + // If the election takes only one round, the expected term/index is 2/4. + tracing::info!("term: {}", metrics.current_term); + tracing::info!("index: {}", metrics.last_log_index); + assert!(metrics.current_term >= 2, "term incr when leader changes"); + router.assert_stable_cluster(Some(metrics.current_term), Some(want)).await; + router.assert_storage_state(metrics.current_term, want, None, 0, None).await; // ----------------------------------- ^^^ this is `0` instead of `4` because blank payloads from new leaders // and config change entries are never applied to the state machine.