-
Notifications
You must be signed in to change notification settings - Fork 270
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ForwardingStage #3222
base: master
Are you sure you want to change the base?
ForwardingStage #3222
Conversation
bd528e1
to
41fdcf2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this refactoring, it simplifies the networking code improvements on my side significantly. Some questions/comments:
-
My understanding is that before we forwarded only txs that marked to be forwarded by banking stage. Now we just forward all the traffic. If it is correct, what's the purpose to forward txs that are going to be added to the block?
-
Quite some forwarding metrics have been removed. What are the replacements for
total_forwardable_packets
andtotal_dropped_packets
? -
We use new unbound channel for the
ForwardingStage
. I guess the assumption is that this stage drain channel immediately and internally decides if it wants to send or drop according to the data budget. The data budget is 12MB/s of staked non-forwarded traffic, which probably means that almost all the traffic will be forwarded. The situation when the channel can grow is when we put into it data faster than we consume in the run's loop. By looking into this code, it doesn't seem to be the case. We take a read lock in poh and allocate memory for the batch. Maybe only if the load is extreme. -
Separation between Votes and Txs. In the old code there were two different Forwarders: for votes and for txs. Now it is one ForwardingStage for both, the only concern about it is that if there is high txs traffic we might drop votes due to budget limitations.
.zip(fee_budget_limits_vec.drain(..)) | ||
.zip(check_results) | ||
.filter(|(_, check_result)| check_result.is_ok()) | ||
for (((transaction, max_age), fee_budget_limits), _check_result) in transactions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, I wonder if it is possible to simplify with izip!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah potentially possible but not gonna change it in this PR
core/src/forwarding_stage.rs
Outdated
udp_socket: UdpSocket, | ||
} | ||
|
||
impl<T: LikeClusterInfo> ForwardingStage<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I right that there are two types of ForwardingStage -- one for txs and one for TpuVotes? In this, case it makes sense to extract structure to handle network -- one for txs and one for votes because they are delivered using different protocols. This can be done separately by me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a single ForwardingStage
that handles both votes and non-votes, although it handles them differently.
.map(|packet| packet.meta().is_simple_vote_tx()) | ||
.unwrap_or(false) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add tests here similar to what forwarder had?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this was draft because i wanted to see what broke (honestly surprised to see it is nothing), before I went back and added tests.
core/src/tpu.rs
Outdated
@@ -199,15 +201,22 @@ impl Tpu { | |||
) | |||
.unwrap(); | |||
|
|||
let (forward_stage_sender, forward_stage_receiver) = unbounded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be bound, this way we are explicit about dropping some traffic in case of network problems. What currently happens is that we add futures sending txs batches to the runtime and if there are lot of batches and sending is slow, this queue will grow unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Backpressure handling could just be dropping txs on the floor and incrementing a counter. This would at least make it obvious if forwarding stage can't keep up
)))?; | ||
let banking_packet_batch = | ||
BankingPacketBatch::new((packet_batches, Some(tracer_packet_stats_to_send))); | ||
if let Some(forward_stage_sender) = &self.forward_stage_sender { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the order of sending matters?
if let Some(forward_stage_sender) = &self.forward_stage_sender {
forward_stage_sender.send(banking_packet_batch.clone())?;
}
self.banking_stage_sender.send(banking_packet_batch)?;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only in that if I had to choose a stage to have lower latency it'd definitely be banking stage
@@ -364,25 +343,6 @@ impl UnprocessedTransactionStorage { | |||
} | |||
} | |||
|
|||
pub fn filter_forwardable_packets_and_add_batches( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before forwarder used this to filter outdated transactions, now we skip this. Is this filtering done somewhere upper in the stack now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not, we can add addtional filtering in ForwardingStage
.
Was trying to keep it as simple as possible for v0 because what filtering we actually care about doing is somewhat questionable.
We only forward tpu votes and partnered connection's non-votes.
votes I think ideally we'd check age and possibly that they are valid staked votes.
non-votes that we forward are coming from our partnered connections, so these should be somewhat trusted in terms of content...if they are spamming old transactions then they are wasting their money paying me to forward.
We do not forward all traffic.
Unsure what you mean by "marked" here.
100% we should add back in some metrics. As I stated in a comment above, I wanted to get this through CI to see what broke without making
Let's just simplify and make it a bounded channel. We can have a fairly small bound as well I think; signature verification should take longer than just spawning off some tasks to forward (I would guess, but need to actually bench/profile this)
While true that there were different threads doing the forwarding in the old setup, there was still a single |
@apfitzge This is something I don't fully understand from code. How do we check in the code that the transaction is from these partners? By checking that they are coming from staked connection?
I might misunderstand how it was: my impression was that during the banking stage we processed some transactions during the interval of 400ms and those which we didn't manage to process during this period of time we forwarded (if not hold). I'm under the impression that now we forward regardless of the banking stage decision which confuses me. |
Exactly. It's either from a staked partner or it is a tpu vote (also staked). Both of which we may want to forward.
If I am an operator and have a deal to forward some partners packets, then we just do that regardless of what banking stage is doing. If we wait until after leader slots to forward, it is useless to do forwarding at that point. |
To me this PR looks a very good move forward, hence lgtm. I've done already replacement of the client code on top of this PR, so biased to have it merged. But would be good to see approvals from others. |
@t-nelson @alessandrod or @tao-stones. Would like to get this in v2.1 so there's one less version to maintain this |
This is a dead-simple forwarding approach. Maybe we should hold off on merging this until 2.2, so we have time to add prioritization in the above stages so that this will only pop the highest priority packets when we go to forward. |
@apfitzge I'm under the impression that the |
Yeah sorry should have put this back into draft! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let (success, num_packets) = if tpu_vote_batch { | ||
let packets: Vec<_> = filtered_packets.into_iter().zip(repeat(addr)).collect(); | ||
let num_packets = packets.len(); | ||
let res = batch_send(&self.udp_socket, &packets); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lijunwangs just a heads up that this is coming and we'll need to add quic support for forwarded votes. This is still in draft though
} | ||
} | ||
|
||
fn run(mut self) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too easy to understand. Please complicate
Yeah, have been working in the background to make cost-model not need us to resolve addresses to calculate; Should make this much better w/ no* allocations. *unfortunately found an edge-case in cost-model code the other day, and unless I write a new parser for |
Problem
Summary of Changes
Fixes #