Skip to content

Commit

Permalink
Merge branch 'stefan/send_pws' into stefan/improved_parallel_forknet
Browse files Browse the repository at this point in the history
  • Loading branch information
stedfn committed Jan 2, 2025
2 parents dbba188 + 7f8bb8f commit 6290e03
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 60 deletions.
19 changes: 10 additions & 9 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,15 +638,16 @@ pub(crate) static BLOCK_PRODUCER_MISSING_ENDORSEMENT_COUNT: LazyLock<HistogramVe
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_ENCODE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_encode_time",
"Partial state witness generation from encoded state witness time in seconds",
&["shard_id"],
Some(linear_buckets(0.0, 0.005, 20).unwrap()),
)
.unwrap()
});
pub(crate) static PARTIAL_WITNESS_ENCODE_AND_SEND_TIME: LazyLock<HistogramVec> =
LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_encode_and_send_time",
"Partial state witness generation from encoded state witness time in seconds",
&["shard_id"],
Some(linear_buckets(0.0, 0.005, 20).unwrap()),
)
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_DECODE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,14 +252,14 @@ impl PartialWitnessActor {
}

// Function to generate the parts of the state witness and return them as a tuple of chunk_validator and part.
fn generate_state_witness_parts(
fn generate_and_send_state_witness_parts(
&mut self,
epoch_id: EpochId,
chunk_header: &ShardChunkHeader,
witness_bytes: EncodedChunkStateWitness,
chunk_validators: &[AccountId],
signer: &ValidatorSigner,
) -> Result<Vec<(AccountId, PartialEncodedStateWitness)>, Error> {
) -> Result<(), Error> {
tracing::debug!(
target: "client",
chunk_hash=?chunk_header.chunk_hash(),
Expand All @@ -271,24 +271,29 @@ impl PartialWitnessActor {
let encoder = self.witness_encoders.entry(chunk_validators.len());
let (parts, encoded_length) = encoder.encode(&witness_bytes);

Ok(chunk_validators
.iter()
.zip_eq(parts)
.enumerate()
.map(|(part_ord, (chunk_validator, part))| {
// It's fine to unwrap part here as we just constructed the parts above and we expect
// all of them to be present.
let partial_witness = PartialEncodedStateWitness::new(
epoch_id,
chunk_header.clone(),
part_ord,
part.unwrap().to_vec(),
encoded_length,
signer,
);
(chunk_validator.clone(), partial_witness)
})
.collect_vec())
for (part_ord, (chunk_validator, part)) in chunk_validators.iter().zip_eq(parts).enumerate()
{
// It's fine to unwrap part here as we just constructed the parts above and we expect
// all of them to be present.
let partial_witness = PartialEncodedStateWitness::new(
epoch_id,
chunk_header.clone(),
part_ord,
part.unwrap().to_vec(),
encoded_length,
signer,
);

// Send the parts to the corresponding chunk validator owners.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitness(
chunk_validator.clone(),
partial_witness,
),
));
}

Ok(())
}

fn generate_contract_deploys_parts(
Expand Down Expand Up @@ -343,10 +348,10 @@ impl PartialWitnessActor {

// Record time taken to encode the state witness parts.
let shard_id_label = chunk_header.shard_id().to_string();
let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME
let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_AND_SEND_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
let validator_witness_tuple = self.generate_state_witness_parts(
self.generate_and_send_state_witness_parts(
epoch_id,
chunk_header,
witness_bytes,
Expand All @@ -360,13 +365,9 @@ impl PartialWitnessActor {
self.state_witness_tracker.record_witness_sent(
chunk_hash,
witness_size_in_bytes,
validator_witness_tuple.len(),
chunk_validators.len(),
);

// Send the parts to the corresponding chunk validator owners.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple),
));
Ok(())
}

Expand Down
14 changes: 6 additions & 8 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,14 +757,12 @@ fn process_peer_manager_message_default(
}
}
}
NetworkRequests::PartialEncodedStateWitness(partial_witnesses) => {
for (account, partial_witness) in partial_witnesses {
for (i, name) in validators.iter().enumerate() {
if name == account {
connectors[i]
.partial_witness_sender
.send(PartialEncodedStateWitnessMessage(partial_witness.clone()));
}
NetworkRequests::PartialEncodedStateWitness(account, partial_witness) => {
for (i, name) in validators.iter().enumerate() {
if name == account {
connectors[i]
.partial_witness_sender
.send(PartialEncodedStateWitnessMessage(partial_witness.clone()));
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1069,14 +1069,12 @@ impl PeerManagerActor {
);
NetworkResponses::NoResponse
}
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => {
for (chunk_validator, partial_witness) in validator_witness_tuple {
self.state.send_message_to_account(
&self.clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitness(partial_witness),
);
}
NetworkRequests::PartialEncodedStateWitness(chunk_validator, partial_witness) => {
self.state.send_message_to_account(
&self.clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitness(partial_witness),
);
NetworkResponses::NoResponse
}
NetworkRequests::PartialEncodedStateWitnessForward(
Expand Down
12 changes: 5 additions & 7 deletions chain/network/src/test_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,11 @@ fn network_message_to_partial_witness_handler(
None
}

NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => {
for (target, partial_witness) in validator_witness_tuple.into_iter() {
shared_state
.senders_for_account(&target)
.partial_witness_sender
.send(PartialEncodedStateWitnessMessage(partial_witness));
}
NetworkRequests::PartialEncodedStateWitness(target, partial_witness) => {
shared_state
.senders_for_account(&target)
.partial_witness_sender
.send(PartialEncodedStateWitnessMessage(partial_witness));
None
}
NetworkRequests::PartialEncodedStateWitnessForward(chunk_validators, partial_witness) => {
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ pub enum NetworkRequests {
/// Message for a chunk endorsement, sent by a chunk validator to the block producer.
ChunkEndorsement(AccountId, ChunkEndorsement),
/// Message from chunk producer to set of chunk validators to send state witness part.
PartialEncodedStateWitness(Vec<(AccountId, PartialEncodedStateWitness)>),
PartialEncodedStateWitness(AccountId, PartialEncodedStateWitness),
/// Message from chunk validator to all other chunk validators to forward state witness part.
PartialEncodedStateWitnessForward(Vec<AccountId>, PartialEncodedStateWitness),
/// Requests an epoch sync
Expand Down

0 comments on commit 6290e03

Please sign in to comment.