diff --git a/core/benches/sigverify_stage.rs b/core/benches/sigverify_stage.rs index 3f11cc150574d3..1d18b8cdceb36c 100644 --- a/core/benches/sigverify_stage.rs +++ b/core/benches/sigverify_stage.rs @@ -158,7 +158,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher, use_same_tx: bool) { trace!("start"); let (packet_s, packet_r) = unbounded(); let (verified_s, verified_r) = BankingTracer::channel_for_test(); - let verifier = TransactionSigVerifier::new(verified_s); + let verifier = TransactionSigVerifier::new(verified_s, None); let stage = SigVerifyStage::new(packet_r, verifier, "solSigVerBench", "bench"); bencher.iter(move || { @@ -237,7 +237,7 @@ fn prepare_batches(discard_factor: i32) -> (Vec, usize) { fn bench_shrink_sigverify_stage_core(bencher: &mut Bencher, discard_factor: i32) { let (batches0, num_valid_packets) = prepare_batches(discard_factor); let (verified_s, _verified_r) = BankingTracer::channel_for_test(); - let verifier = TransactionSigVerifier::new(verified_s); + let verifier = TransactionSigVerifier::new(verified_s, None); let mut c = 0; let mut total_shrink_time = 0; diff --git a/core/src/forwarding_stage.rs b/core/src/forwarding_stage.rs new file mode 100644 index 00000000000000..80e8a7b548cc55 --- /dev/null +++ b/core/src/forwarding_stage.rs @@ -0,0 +1,35 @@ +use { + crate::banking_trace::BankingPacketReceiver, + std::thread::{Builder, JoinHandle}, +}; + +pub struct ForwardingStage { + fwd_thread_hdls: Vec>, +} + +impl ForwardingStage { + pub fn new(num_threads: usize, receiver: BankingPacketReceiver) -> Self { + Self { + fwd_thread_hdls: (0..num_threads) + .map(|index| { + let receiver = receiver.clone(); + Self::spawn_forwarding_thread(receiver, index) + }) + .collect(), + } + } + + pub fn join(self) -> std::thread::Result<()> { + for bank_thread_hdl in self.fwd_thread_hdls { + bank_thread_hdl.join()?; + } + Ok(()) + } + + fn spawn_forwarding_thread(receiver: BankingPacketReceiver, index: usize) -> JoinHandle<()> { + Builder::new() + .name(format!("solFwdStg{index:02}")) + .spawn(move || while let Ok(_packet_batches) = receiver.recv() {}) + .unwrap() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index c88488f0876667..9befe3b5f8b385 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -22,6 +22,7 @@ pub mod consensus; pub mod cost_update_service; pub mod drop_bank_service; pub mod fetch_stage; +mod forwarding_stage; pub mod gen_keys; pub mod next_leader; pub mod optimistic_confirmation_verifier; diff --git a/core/src/sigverify.rs b/core/src/sigverify.rs index 18984ecc4ef836..aebeafe58ffaef 100644 --- a/core/src/sigverify.rs +++ b/core/src/sigverify.rs @@ -12,6 +12,7 @@ use { banking_trace::{BankingPacketBatch, BankingPacketSender}, sigverify_stage::{SigVerifier, SigVerifyServiceError}, }, + crossbeam_channel::Sender, solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify}, solana_sdk::{packet::Packet, saturating_add_assign}, }; @@ -56,7 +57,8 @@ impl SigverifyTracerPacketStats { } pub struct TransactionSigVerifier { - packet_sender: BankingPacketSender, + banking_stage_sender: BankingPacketSender, + forward_stage_sender: Option>, tracer_packet_stats: SigverifyTracerPacketStats, recycler: Recycler, recycler_out: Recycler>, @@ -64,16 +66,23 @@ pub struct TransactionSigVerifier { } impl TransactionSigVerifier { - pub fn new_reject_non_vote(packet_sender: BankingPacketSender) -> Self { - let mut new_self = Self::new(packet_sender); + pub fn new_reject_non_vote( + banking_stage_sender: BankingPacketSender, + forward_stage_sender: Option>, + ) -> Self { + let mut new_self = Self::new(banking_stage_sender, forward_stage_sender); new_self.reject_non_vote = true; new_self } - pub fn new(packet_sender: BankingPacketSender) -> Self { + pub fn new( + banking_stage_sender: BankingPacketSender, + forward_stage_sender: Option>, + ) -> Self { init(); Self { - packet_sender, + banking_stage_sender, + forward_stage_sender, tracer_packet_stats: SigverifyTracerPacketStats::default(), recycler: Recycler::warmed(50, 4096), recycler_out: Recycler::warmed(50, 4096), @@ -127,10 +136,15 @@ impl SigVerifier for TransactionSigVerifier { packet_batches: Vec, ) -> Result<(), SigVerifyServiceError> { let tracer_packet_stats_to_send = std::mem::take(&mut self.tracer_packet_stats); - self.packet_sender.send(BankingPacketBatch::new(( - packet_batches, - Some(tracer_packet_stats_to_send), - )))?; + let banking_packet_batch = + BankingPacketBatch::new((packet_batches, Some(tracer_packet_stats_to_send))); + if let Some(forward_stage_sender) = &self.forward_stage_sender { + self.banking_stage_sender + .send(banking_packet_batch.clone())?; + forward_stage_sender.send(banking_packet_batch)?; + } else { + self.banking_stage_sender.send(banking_packet_batch)?; + } Ok(()) } diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index ac7d9889db0ed8..95fb29467527a4 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -548,7 +548,7 @@ mod tests { trace!("start"); let (packet_s, packet_r) = unbounded(); let (verified_s, verified_r) = BankingTracer::channel_for_test(); - let verifier = TransactionSigVerifier::new(verified_s); + let verifier = TransactionSigVerifier::new(verified_s, None); let stage = SigVerifyStage::new(packet_r, verifier, "solSigVerTest", "test"); let now = Instant::now(); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 05982c9c3edf29..18fb7414cc5101 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -11,6 +11,7 @@ use { VerifiedVoteSender, VoteTracker, }, fetch_stage::FetchStage, + forwarding_stage::ForwardingStage, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, staked_nodes_updater_service::StakedNodesUpdaterService, @@ -71,6 +72,7 @@ pub struct Tpu { sigverify_stage: SigVerifyStage, vote_sigverify_stage: SigVerifyStage, banking_stage: BankingStage, + forwarding_stage: ForwardingStage, cluster_info_vote_listener: ClusterInfoVoteListener, broadcast_stage: BroadcastStage, tpu_quic_t: thread::JoinHandle<()>, @@ -199,15 +201,22 @@ impl Tpu { ) .unwrap(); + let (forward_stage_sender, forward_stage_receiver) = unbounded(); let sigverify_stage = { - let verifier = TransactionSigVerifier::new(non_vote_sender); + let verifier = TransactionSigVerifier::new( + non_vote_sender, + enable_block_production_forwarding.then(|| forward_stage_sender.clone()), // only forward non-vote transactions if enabled + ); SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier") }; let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote(); let vote_sigverify_stage = { - let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender); + let verifier = TransactionSigVerifier::new_reject_non_vote( + tpu_vote_sender, + Some(forward_stage_sender), + ); SigVerifyStage::new( vote_packet_receiver, verifier, @@ -249,6 +258,8 @@ impl Tpu { enable_block_production_forwarding, ); + let forwarding_stage = ForwardingStage::new(1, forward_stage_receiver); + let (entry_receiver, tpu_entry_notifier) = if let Some(entry_notification_sender) = entry_notification_sender { let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded(); @@ -281,6 +292,7 @@ impl Tpu { sigverify_stage, vote_sigverify_stage, banking_stage, + forwarding_stage, cluster_info_vote_listener, broadcast_stage, tpu_quic_t, @@ -300,6 +312,7 @@ impl Tpu { self.vote_sigverify_stage.join(), self.cluster_info_vote_listener.join(), self.banking_stage.join(), + self.forwarding_stage.join(), self.staked_nodes_updater_service.join(), self.tpu_quic_t.join(), self.tpu_forwards_quic_t.join(),