Skip to content

Commit

Permalink
ForwardingStage: skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge committed Oct 18, 2024
1 parent c3693a6 commit bd528e1
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 14 deletions.
4 changes: 2 additions & 2 deletions core/benches/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher, use_same_tx: bool) {
trace!("start");
let (packet_s, packet_r) = unbounded();
let (verified_s, verified_r) = BankingTracer::channel_for_test();
let verifier = TransactionSigVerifier::new(verified_s);
let verifier = TransactionSigVerifier::new(verified_s, None);
let stage = SigVerifyStage::new(packet_r, verifier, "solSigVerBench", "bench");

bencher.iter(move || {
Expand Down Expand Up @@ -237,7 +237,7 @@ fn prepare_batches(discard_factor: i32) -> (Vec<PacketBatch>, usize) {
fn bench_shrink_sigverify_stage_core(bencher: &mut Bencher, discard_factor: i32) {
let (batches0, num_valid_packets) = prepare_batches(discard_factor);
let (verified_s, _verified_r) = BankingTracer::channel_for_test();
let verifier = TransactionSigVerifier::new(verified_s);
let verifier = TransactionSigVerifier::new(verified_s, None);

let mut c = 0;
let mut total_shrink_time = 0;
Expand Down
35 changes: 35 additions & 0 deletions core/src/forwarding_stage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use {
crate::banking_trace::BankingPacketReceiver,
std::thread::{Builder, JoinHandle},
};

pub struct ForwardingStage {
fwd_thread_hdls: Vec<JoinHandle<()>>,
}

impl ForwardingStage {
pub fn new(num_threads: usize, receiver: BankingPacketReceiver) -> Self {
Self {
fwd_thread_hdls: (0..num_threads)
.map(|index| {
let receiver = receiver.clone();
Self::spawn_forwarding_thread(receiver, index)
})
.collect(),
}
}

pub fn join(self) -> std::thread::Result<()> {
for bank_thread_hdl in self.fwd_thread_hdls {
bank_thread_hdl.join()?;
}
Ok(())
}

fn spawn_forwarding_thread(receiver: BankingPacketReceiver, index: usize) -> JoinHandle<()> {
Builder::new()
.name(format!("solFwdStg{index:02}"))
.spawn(move || while let Ok(_packet_batches) = receiver.recv() {})
.unwrap()
}
}
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod consensus;
pub mod cost_update_service;
pub mod drop_bank_service;
pub mod fetch_stage;
mod forwarding_stage;
pub mod gen_keys;
pub mod next_leader;
pub mod optimistic_confirmation_verifier;
Expand Down
32 changes: 23 additions & 9 deletions core/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use {
banking_trace::{BankingPacketBatch, BankingPacketSender},
sigverify_stage::{SigVerifier, SigVerifyServiceError},
},
crossbeam_channel::Sender,
solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify},
solana_sdk::{packet::Packet, saturating_add_assign},
};
Expand Down Expand Up @@ -56,24 +57,32 @@ impl SigverifyTracerPacketStats {
}

pub struct TransactionSigVerifier {
packet_sender: BankingPacketSender,
banking_stage_sender: BankingPacketSender,
forward_stage_sender: Option<Sender<BankingPacketBatch>>,
tracer_packet_stats: SigverifyTracerPacketStats,
recycler: Recycler<TxOffset>,
recycler_out: Recycler<PinnedVec<u8>>,
reject_non_vote: bool,
}

impl TransactionSigVerifier {
pub fn new_reject_non_vote(packet_sender: BankingPacketSender) -> Self {
let mut new_self = Self::new(packet_sender);
pub fn new_reject_non_vote(
banking_stage_sender: BankingPacketSender,
forward_stage_sender: Option<Sender<BankingPacketBatch>>,
) -> Self {
let mut new_self = Self::new(banking_stage_sender, forward_stage_sender);
new_self.reject_non_vote = true;
new_self
}

pub fn new(packet_sender: BankingPacketSender) -> Self {
pub fn new(
banking_stage_sender: BankingPacketSender,
forward_stage_sender: Option<Sender<BankingPacketBatch>>,
) -> Self {
init();
Self {
packet_sender,
banking_stage_sender,
forward_stage_sender,
tracer_packet_stats: SigverifyTracerPacketStats::default(),
recycler: Recycler::warmed(50, 4096),
recycler_out: Recycler::warmed(50, 4096),
Expand Down Expand Up @@ -127,10 +136,15 @@ impl SigVerifier for TransactionSigVerifier {
packet_batches: Vec<PacketBatch>,
) -> Result<(), SigVerifyServiceError<Self::SendType>> {
let tracer_packet_stats_to_send = std::mem::take(&mut self.tracer_packet_stats);
self.packet_sender.send(BankingPacketBatch::new((
packet_batches,
Some(tracer_packet_stats_to_send),
)))?;
let banking_packet_batch =
BankingPacketBatch::new((packet_batches, Some(tracer_packet_stats_to_send)));
if let Some(forward_stage_sender) = &self.forward_stage_sender {
self.banking_stage_sender
.send(banking_packet_batch.clone())?;
forward_stage_sender.send(banking_packet_batch)?;
} else {
self.banking_stage_sender.send(banking_packet_batch)?;
}
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/sigverify_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ mod tests {
trace!("start");
let (packet_s, packet_r) = unbounded();
let (verified_s, verified_r) = BankingTracer::channel_for_test();
let verifier = TransactionSigVerifier::new(verified_s);
let verifier = TransactionSigVerifier::new(verified_s, None);
let stage = SigVerifyStage::new(packet_r, verifier, "solSigVerTest", "test");

let now = Instant::now();
Expand Down
17 changes: 15 additions & 2 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
VerifiedVoteSender, VoteTracker,
},
fetch_stage::FetchStage,
forwarding_stage::ForwardingStage,
sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
staked_nodes_updater_service::StakedNodesUpdaterService,
Expand Down Expand Up @@ -71,6 +72,7 @@ pub struct Tpu {
sigverify_stage: SigVerifyStage,
vote_sigverify_stage: SigVerifyStage,
banking_stage: BankingStage,
forwarding_stage: ForwardingStage,
cluster_info_vote_listener: ClusterInfoVoteListener,
broadcast_stage: BroadcastStage,
tpu_quic_t: thread::JoinHandle<()>,
Expand Down Expand Up @@ -199,15 +201,22 @@ impl Tpu {
)
.unwrap();

let (forward_stage_sender, forward_stage_receiver) = unbounded();
let sigverify_stage = {
let verifier = TransactionSigVerifier::new(non_vote_sender);
let verifier = TransactionSigVerifier::new(
non_vote_sender,
enable_block_production_forwarding.then(|| forward_stage_sender.clone()), // only forward non-vote transactions if enabled
);
SigVerifyStage::new(packet_receiver, verifier, "solSigVerTpu", "tpu-verifier")
};

let (tpu_vote_sender, tpu_vote_receiver) = banking_tracer.create_channel_tpu_vote();

let vote_sigverify_stage = {
let verifier = TransactionSigVerifier::new_reject_non_vote(tpu_vote_sender);
let verifier = TransactionSigVerifier::new_reject_non_vote(
tpu_vote_sender,
Some(forward_stage_sender),
);
SigVerifyStage::new(
vote_packet_receiver,
verifier,
Expand Down Expand Up @@ -249,6 +258,8 @@ impl Tpu {
enable_block_production_forwarding,
);

let forwarding_stage = ForwardingStage::new(1, forward_stage_receiver);

let (entry_receiver, tpu_entry_notifier) =
if let Some(entry_notification_sender) = entry_notification_sender {
let (broadcast_entry_sender, broadcast_entry_receiver) = unbounded();
Expand Down Expand Up @@ -281,6 +292,7 @@ impl Tpu {
sigverify_stage,
vote_sigverify_stage,
banking_stage,
forwarding_stage,
cluster_info_vote_listener,
broadcast_stage,
tpu_quic_t,
Expand All @@ -300,6 +312,7 @@ impl Tpu {
self.vote_sigverify_stage.join(),
self.cluster_info_vote_listener.join(),
self.banking_stage.join(),
self.forwarding_stage.join(),
self.staked_nodes_updater_service.join(),
self.tpu_quic_t.join(),
self.tpu_forwards_quic_t.join(),
Expand Down

0 comments on commit bd528e1

Please sign in to comment.