From d1b3b23219433ff594d7249e086bd1e95ef5b8e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sun, 12 Feb 2023 22:08:04 +0800 Subject: [PATCH] Change: remove RaftNetworkFactory::ConnectionError and AddLearnerError::NetworkError `RaftNetworkFactory::new_client()` does not return an error because openraft can only ignore it. Therefore it should **not** create a connection but rather a client that will connect when required. Thus there is chance it will build a client that is unable to send out anything, e.g., in case the Node network address is configured incorrectly. Because of the above change, And `AddLearnerError` will not include a NetworkError any more, because when adding a learner, the connectivity can not be effectively detected. Upgrade tip: Just update the application network implementation so that it compiles. --- .../src/network/raft_network_impl.rs | 11 ++-- .../src/network/raft_network_impl.rs | 9 +--- openraft/src/core/raft_core.rs | 53 +++++-------------- openraft/src/error.rs | 11 +--- openraft/src/network.rs | 17 ++---- .../t10_conflict_with_empty_entries.rs | 6 +-- .../append_entries/t10_see_higher_vote.rs | 2 +- .../t50_append_entries_with_bigger_term.rs | 2 +- openraft/tests/fixtures/mod.rs | 14 ++--- .../tests/log_compaction/t10_compaction.rs | 2 +- .../t41_snapshot_overrides_membership.rs | 2 +- .../t43_snapshot_delete_conflict_logs.rs | 4 +- 12 files changed, 36 insertions(+), 97 deletions(-) diff --git a/examples/raft-kv-memstore/src/network/raft_network_impl.rs b/examples/raft-kv-memstore/src/network/raft_network_impl.rs index 82c48d784..5e84497b8 100644 --- a/examples/raft-kv-memstore/src/network/raft_network_impl.rs +++ b/examples/raft-kv-memstore/src/network/raft_network_impl.rs @@ -59,18 +59,13 @@ impl ExampleNetwork { #[async_trait] impl RaftNetworkFactory for ExampleNetwork { type Network = ExampleNetworkConnection; - type ConnectionError = NetworkError; - async fn new_client( - &mut self, - target: ExampleNodeId, - node: &BasicNode, - ) -> Result { - Ok(ExampleNetworkConnection { + async fn new_client(&mut self, target: ExampleNodeId, node: &BasicNode) -> Self::Network { + ExampleNetworkConnection { owner: ExampleNetwork {}, target, target_node: node.clone(), - }) + } } } diff --git a/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs b/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs index 7402d3ace..a2f67c393 100644 --- a/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs +++ b/examples/raft-kv-rocksdb/src/network/raft_network_impl.rs @@ -32,16 +32,11 @@ pub struct ExampleNetwork {} #[async_trait] impl RaftNetworkFactory for ExampleNetwork { type Network = ExampleNetworkConnection; - type ConnectionError = NetworkError; - async fn new_client( - &mut self, - target: ExampleNodeId, - node: &ExampleNode, - ) -> Result { + async fn new_client(&mut self, target: ExampleNodeId, node: &ExampleNode) -> Self::Network { let addr = format!("ws://{}", node.rpc_addr); let client = Client::dial_websocket(&addr).await.ok(); - Ok(ExampleNetworkConnection { addr, client, target }) + ExampleNetworkConnection { addr, client, target } } } diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 183fd4bb0..0f69a2d12 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -39,7 +39,6 @@ use crate::engine::Command; use crate::engine::Engine; use crate::engine::SendResult; use crate::entry::EntryRef; -use crate::error::AddLearnerError; use crate::error::ChangeMembershipError; use crate::error::CheckIsLeaderError; use crate::error::ClientWriteError; @@ -50,7 +49,6 @@ use crate::error::InProgress; use crate::error::InitializeError; use crate::error::LearnerIsLagging; use crate::error::LearnerNotFound; -use crate::error::NetworkError; use crate::error::QuorumNotEnough; use crate::error::RPCError; use crate::error::Timeout; @@ -271,13 +269,7 @@ impl, S: RaftStorage> RaftCore n, - Err(e) => { - tracing::error!(target = display(target), "Failed to create client, this is a non recoverable error, the node will be permanently ignored! {}", e); - continue; - } - }; + let mut client = self.network.new_client(target, &target_node).await; let ttl = Duration::from_millis(self.config.heartbeat_interval); @@ -414,13 +406,6 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore, - ) -> Result, N::ConnectionError> { + ) -> ReplicationHandle { // Safe unwrap(): target must be in membership let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap(); let membership_log_id = self.engine.state.membership_state.effective().log_id; - let network = self.network.new_client(target, target_node).await?; + let network = self.network.new_client(target, target_node).await; let session_id = ReplicationSessionId::new(*self.engine.state.get_vote(), membership_log_id); - Ok(ReplicationCore::::spawn( + ReplicationCore::::spawn( target, session_id, self.config.clone(), @@ -953,7 +938,7 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore n, - Err(err) => { - tracing::error!({error=%err, target=display(target)}, "while requesting vote"); - continue; - } - }; + let mut client = self.network.new_client(target, &target_node).await; let tx = self.tx_api.clone(); @@ -1551,19 +1530,13 @@ impl, S: RaftStorage> RaftRuntime self.remove_all_replication().await; for (target, matching) in targets.iter() { - match self.spawn_replication_stream(*target, *matching).await { - Ok(state) => { - if let Some(l) = &mut self.leader_data { - l.nodes.insert(*target, state); - } else { - unreachable!("it has to be a leader!!!"); - } - } - Err(e) => { - tracing::error!({node = % target}, "cannot connect to {:?}", e); - // cannot return Err, or raft fail completely - } - }; + let handle = self.spawn_replication_stream(*target, *matching).await; + + if let Some(l) = &mut self.leader_data { + l.nodes.insert(*target, handle); + } else { + unreachable!("it has to be a leader!!!"); + } } } Command::UpdateProgressMetrics { target, matching } => { diff --git a/openraft/src/error.rs b/openraft/src/error.rs index c570e2daf..b48457941 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -209,10 +209,6 @@ where { #[error(transparent)] ForwardToLeader(#[from] ForwardToLeader), - - // TODO: do we really need this error? An app may check an target node if it wants to. - #[error(transparent)] - NetworkError(#[from] NetworkError), } impl TryAsRef> for AddLearnerError @@ -223,7 +219,6 @@ where fn try_as_ref(&self) -> Option<&ForwardToLeader> { match self { Self::ForwardToLeader(f) => Some(f), - _ => None, } } } @@ -236,10 +231,8 @@ where type Error = AddLearnerError; fn try_from(value: AddLearnerError) -> Result { - if let AddLearnerError::ForwardToLeader(e) = value { - return Ok(e); - } - Err(value) + let AddLearnerError::ForwardToLeader(e) = value; + Ok(e) } } diff --git a/openraft/src/network.rs b/openraft/src/network.rs index 782ba04fa..b07f65145 100644 --- a/openraft/src/network.rs +++ b/openraft/src/network.rs @@ -1,6 +1,5 @@ //! The Raft network interface. -use std::error::Error; use std::fmt::Formatter; use async_trait::async_trait; @@ -81,22 +80,14 @@ where C: RaftTypeConfig /// Actual type of the network handling a single connection. type Network: RaftNetwork; - /// The error that an implementation returns when `connect()` fails. - // TODO: renaming it to `create()` would be better? - type ConnectionError: Error + Send + Sync; - /// Create a new network instance sending RPCs to the target node. /// /// This function should **not** create a connection but rather a client that will connect when - /// required + /// required. Therefore there is chance it will build a client that is unable to send out + /// anything, e.g., in case the Node network address is configured incorrectly. But this method + /// does not return an error because openraft can only ignore it. /// /// The method is intentionally async to give the implementation a chance to use asynchronous /// sync primitives to serialize access to the common internal object, if needed. - /// - /// # Errors - /// When this function errors, it indicates a non-recoverable error in the network, no retries - /// will be attempted, and the corresponding node will be constantly unavailable. - /// Any recoverable error (such as unreachable host) should be returned as `Ok` with a client - /// that will perform the reconnect - async fn new_client(&mut self, target: C::NodeId, node: &C::Node) -> Result; + async fn new_client(&mut self, target: C::NodeId, node: &C::Node) -> Self::Network; } diff --git a/openraft/tests/append_entries/t10_conflict_with_empty_entries.rs b/openraft/tests/append_entries/t10_conflict_with_empty_entries.rs index 4ae099868..74bb305bc 100644 --- a/openraft/tests/append_entries/t10_conflict_with_empty_entries.rs +++ b/openraft/tests/append_entries/t10_conflict_with_empty_entries.rs @@ -57,7 +57,7 @@ async fn conflict_with_empty_entries() -> Result<()> { leader_commit: Some(LogId::new(LeaderId::new(1, 0), 5)), }; - let resp = router.new_client(0, &()).await?.send_append_entries(rpc).await?; + let resp = router.new_client(0, &()).await.send_append_entries(rpc).await?; assert!(!resp.is_success()); assert!(resp.is_conflict()); @@ -77,7 +77,7 @@ async fn conflict_with_empty_entries() -> Result<()> { leader_commit: Some(LogId::new(LeaderId::new(1, 0), 5)), }; - let resp = router.new_client(0, &()).await?.send_append_entries(rpc).await?; + let resp = router.new_client(0, &()).await.send_append_entries(rpc).await?; assert!(resp.is_success()); assert!(!resp.is_conflict()); @@ -90,7 +90,7 @@ async fn conflict_with_empty_entries() -> Result<()> { leader_commit: Some(LogId::new(LeaderId::new(1, 0), 5)), }; - let resp = router.new_client(0, &()).await?.send_append_entries(rpc).await?; + let resp = router.new_client(0, &()).await.send_append_entries(rpc).await?; assert!(!resp.is_success()); assert!(resp.is_conflict()); diff --git a/openraft/tests/append_entries/t10_see_higher_vote.rs b/openraft/tests/append_entries/t10_see_higher_vote.rs index fc51e6e5d..8d65b8b2b 100644 --- a/openraft/tests/append_entries/t10_see_higher_vote.rs +++ b/openraft/tests/append_entries/t10_see_higher_vote.rs @@ -36,7 +36,7 @@ async fn append_sees_higher_vote() -> Result<()> { { router .new_client(1, &()) - .await? + .await .send_vote(VoteRequest { vote: Vote::new(10, 1), last_log_id: Some(LogId::new(LeaderId::new(10, 1), 5)), diff --git a/openraft/tests/append_entries/t50_append_entries_with_bigger_term.rs b/openraft/tests/append_entries/t50_append_entries_with_bigger_term.rs index ceb4c7435..adaa6f17b 100644 --- a/openraft/tests/append_entries/t50_append_entries_with_bigger_term.rs +++ b/openraft/tests/append_entries/t50_append_entries_with_bigger_term.rs @@ -42,7 +42,7 @@ async fn append_entries_with_bigger_term() -> Result<()> { leader_commit: Some(LogId::new(LeaderId::new(1, 0), log_index)), }; - let resp = router.new_client(0, &()).await?.send_append_entries(req).await?; + let resp = router.new_client(0, &()).await.send_append_entries(req).await?; assert!(resp.is_success()); // after append entries, check hard state in term 2 and vote for node 1 diff --git a/openraft/tests/fixtures/mod.rs b/openraft/tests/fixtures/mod.rs index fb922b840..aad00113a 100644 --- a/openraft/tests/fixtures/mod.rs +++ b/openraft/tests/fixtures/mod.rs @@ -955,20 +955,12 @@ where S: Default + Clone, { type Network = RaftRouterNetwork; - type ConnectionError = NetworkError; - async fn new_client(&mut self, target: C::NodeId, _node: &C::Node) -> Result { - { - let unreachable = self.unconnectable.lock().unwrap(); - if unreachable.contains(&target) { - let e = NetworkError::new(&AnyError::error(format!("failed to connect: {}", target))); - return Err(e); - } - } - Ok(RaftRouterNetwork { + async fn new_client(&mut self, target: C::NodeId, _node: &C::Node) -> Self::Network { + RaftRouterNetwork { target, owner: self.clone(), - }) + } } } diff --git a/openraft/tests/log_compaction/t10_compaction.rs b/openraft/tests/log_compaction/t10_compaction.rs index cc8ce6eca..5acc916ab 100644 --- a/openraft/tests/log_compaction/t10_compaction.rs +++ b/openraft/tests/log_compaction/t10_compaction.rs @@ -138,7 +138,7 @@ async fn compaction() -> Result<()> { { let res = router .new_client(1, &()) - .await? + .await .send_append_entries(AppendEntriesRequest { vote: Vote::new_committed(1, 0), prev_log_id: Some(LogId::new(LeaderId::new(1, 0), 2)), diff --git a/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs b/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs index f50332827..b2cbf66d1 100644 --- a/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs +++ b/openraft/tests/snapshot/t41_snapshot_overrides_membership.rs @@ -98,7 +98,7 @@ async fn snapshot_overrides_membership() -> Result<()> { }], leader_commit: Some(LogId::new(LeaderId::new(0, 0), 0)), }; - router.new_client(1, &()).await?.send_append_entries(req).await?; + router.new_client(1, &()).await.send_append_entries(req).await?; tracing::info!("--- check that learner membership is affected"); { diff --git a/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs b/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs index 949933862..37a704731 100644 --- a/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs +++ b/openraft/tests/snapshot/t43_snapshot_delete_conflict_logs.rs @@ -116,7 +116,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { ], leader_commit: Some(LogId::new(LeaderId::new(1, 0), 2)), }; - router.new_client(1, &()).await?.send_append_entries(req).await?; + router.new_client(1, &()).await.send_append_entries(req).await?; tracing::info!("--- check that learner membership is affected"); { @@ -146,7 +146,7 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { done: true, }; - router.new_client(1, &()).await?.send_install_snapshot(req).await?; + router.new_client(1, &()).await.send_install_snapshot(req).await?; tracing::info!("--- DONE installing snapshot");