Skip to content

Commit

Permalink
Move payload_manager.notify_commit to after commit (#15361)
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse authored Nov 22, 2024
1 parent 31dad98 commit 7d41b65
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 24 deletions.
13 changes: 7 additions & 6 deletions consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ impl PipelineBuilder {
pre_commit_fut.clone(),
parent.post_pre_commit_fut.clone(),
self.state_sync_notifier.clone(),
self.payload_manager.clone(),
block.clone(),
),
&mut abort_handles,
Expand All @@ -335,6 +334,7 @@ impl PipelineBuilder {
pre_commit_fut.clone(),
commit_ledger_fut.clone(),
parent.post_commit_fut.clone(),
self.payload_manager.clone(),
block_store_callback,
block.clone(),
),
Expand Down Expand Up @@ -618,15 +618,12 @@ impl PipelineBuilder {
pre_commit: TaskFuture<PreCommitResult>,
parent_post_pre_commit: TaskFuture<PostCommitResult>,
state_sync_notifier: Arc<dyn ConsensusNotificationSender>,
payload_manager: Arc<dyn TPayloadManager>,
block: Arc<Block>,
) -> TaskResult<PostPreCommitResult> {
let compute_result = pre_commit.await?;
parent_post_pre_commit.await?;

let _tracker = Tracker::new("post_pre_commit", &block);
let payload = block.payload().cloned();
let timestamp = block.timestamp_usecs();
let _timer = counters::OP_COUNTERS.timer("pre_commit_notify");

let txns = compute_result.transactions_to_commit().to_vec();
Expand All @@ -640,8 +637,6 @@ impl PipelineBuilder {
error!(error = ?e, "Failed to notify state synchronizer");
}

let payload_vec = payload.into_iter().collect();
payload_manager.notify_commit(timestamp, payload_vec);
Ok(())
}

Expand Down Expand Up @@ -684,6 +679,7 @@ impl PipelineBuilder {
pre_commit_fut: TaskFuture<PreCommitResult>,
commit_ledger_fut: TaskFuture<CommitLedgerResult>,
parent_post_commit: TaskFuture<PostCommitResult>,
payload_manager: Arc<dyn TPayloadManager>,
block_store_callback: Box<dyn FnOnce(LedgerInfoWithSignatures) + Send + Sync>,
block: Arc<Block>,
) -> TaskResult<PostCommitResult> {
Expand All @@ -695,6 +691,11 @@ impl PipelineBuilder {
update_counters_for_block(&block);
update_counters_for_compute_result(&compute_result);

let payload = block.payload().cloned();
let timestamp = block.timestamp_usecs();
let payload_vec = payload.into_iter().collect();
payload_manager.notify_commit(timestamp, payload_vec);

if let Some(ledger_info_with_sigs) = maybe_ledger_info_with_sigs {
block_store_callback(ledger_info_with_sigs);
}
Expand Down
47 changes: 29 additions & 18 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,9 @@ impl ExecutionProxy {
tx
}

fn pre_commit_hook(
&self,
block: &Block,
payload_manager: Arc<dyn TPayloadManager>,
) -> PreCommitHook {
fn pre_commit_hook(&self) -> PreCommitHook {
let mut pre_commit_notifier = self.pre_commit_notifier.clone();
let state_sync_notifier = self.state_sync_notifier.clone();
let payload = block.payload().cloned();
let timestamp = block.timestamp_usecs();
Box::new(move |state_compute_result: &StateComputeResult| {
let state_compute_result = state_compute_result.clone();
Box::pin(async move {
Expand All @@ -158,16 +152,38 @@ impl ExecutionProxy {
) {
error!(error = ?e, "Failed to notify state synchronizer");
}

let payload_vec = payload.into_iter().collect();
payload_manager.notify_commit(timestamp, payload_vec);
}))
.await
.expect("Failed to send pre-commit notification");
})
})
}

fn commit_hook(
&self,
blocks: &[Arc<PipelinedBlock>],
callback: StateComputerCommitCallBackType,
finality_proof: LedgerInfoWithSignatures,
) -> NotificationType {
let payload_manager = self
.state
.read()
.as_ref()
.expect("must be set within an epoch")
.payload_manager
.clone();
let blocks = blocks.to_vec();
Box::pin(async move {
for block in blocks.iter() {
let payload = block.payload().cloned();
let payload_vec = payload.into_iter().collect();
let timestamp = block.timestamp_usecs();
payload_manager.notify_commit(timestamp, payload_vec);
}
callback(&blocks, finality_proof);
})
}

pub fn pipeline_builder(&self, commit_signer: Arc<ValidatorSigner>) -> PipelineBuilder {
let MutableState {
validators,
Expand Down Expand Up @@ -236,7 +252,7 @@ impl StateComputer for ExecutionProxy {

let txn_notifier = self.txn_notifier.clone();
let transaction_generator = BlockPreparer::new(
payload_manager.clone(),
payload_manager,
self.transaction_filter.clone(),
transaction_deduper.clone(),
transaction_shuffler.clone(),
Expand All @@ -260,7 +276,7 @@ impl StateComputer for ExecutionProxy {
parent_block_id,
transaction_generator,
block_executor_onchain_config,
self.pre_commit_hook(block, payload_manager),
self.pre_commit_hook(),
lifetime_guard,
)
.await;
Expand Down Expand Up @@ -343,14 +359,9 @@ impl StateComputer for ExecutionProxy {
)
.expect("spawn_blocking failed");

let blocks = blocks.to_vec();
let callback_fut = Box::pin(async move {
callback(&blocks, finality_proof);
});

self.commit_notifier
.clone()
.send(callback_fut)
.send(self.commit_hook(blocks, callback, finality_proof))
.await
.expect("Failed to send commit notification");

Expand Down

0 comments on commit 7d41b65

Please sign in to comment.