Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Nov 22, 2024
1 parent 7d41b65 commit c88743f
Show file tree
Hide file tree
Showing 23 changed files with 556 additions and 442 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 43 additions & 17 deletions execution/executor-types/src/execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@ use crate::{
transactions_with_output::{TransactionsToKeep, TransactionsWithOutput},
};
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::{cached_state_view::StateCache, state_delta::StateDelta};
use aptos_storage_interface::{cached_state_view::StateCache, state_delta::InMemState};
use aptos_types::{
contract_event::ContractEvent,
epoch_state::EpochState,
transaction::{
block_epilogue::BlockEndInfo, ExecutionStatus, Transaction, TransactionStatus, Version,
},
transaction::{block_epilogue::BlockEndInfo, Transaction, TransactionStatus, Version},
};
use derive_more::Deref;
use std::sync::Arc;
Expand All @@ -37,8 +35,15 @@ impl ExecutionOutput {
state_cache: StateCache,
block_end_info: Option<BlockEndInfo>,
next_epoch_state: Option<EpochState>,
parent_state: InMemState,
last_checkpoint_state: Option<InMemState>,
result_state: InMemState,
subscribable_events: Planned<Vec<ContractEvent>>,
) -> Self {
assert_eq!(first_version, parent_state.next_version());
let next_version = first_version + to_commit.len() as Version;
assert_eq!(next_version, result_state.next_version());

if is_block {
// If it's a block, ensure it ends with state checkpoint.
assert!(to_commit.is_empty() || to_commit.ends_with_sole_checkpoint());
Expand All @@ -57,11 +62,14 @@ impl ExecutionOutput {
state_cache,
block_end_info,
next_epoch_state,
parent_state,
last_checkpoint_state,
result_state,
subscribable_events,
})
}

pub fn new_empty(state: Arc<StateDelta>) -> Self {
pub fn new_empty(state: InMemState) -> Self {
Self::new_impl(Inner {
is_block: false,
first_version: state.next_version(),
Expand All @@ -72,11 +80,15 @@ impl ExecutionOutput {
state_cache: StateCache::new_empty(state.current.clone()),
block_end_info: None,
next_epoch_state: None,
parent_state: state.clone(),
last_checkpoint_state: None,
result_state: state,
subscribable_events: Planned::ready(vec![]),
})
}

pub fn new_dummy_with_input_txns(txns: Vec<Transaction>) -> Self {
/*
let num_txns = txns.len();
let success_status = TransactionStatus::Keep(ExecutionStatus::Success);
Self::new_impl(Inner {
Expand All @@ -91,25 +103,31 @@ impl ExecutionOutput {
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
})
*/
todo!() // FIXME(aldenhu)
}

pub fn new_dummy() -> Self {
Self::new_dummy_with_input_txns(vec![])
}

pub fn reconfig_suffix(&self) -> Self {
Self::new_impl(Inner {
is_block: false,
first_version: self.next_version(),
statuses_for_input_txns: vec![],
to_commit: TransactionsToKeep::new_empty(),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_dummy(),
block_end_info: None,
next_epoch_state: self.next_epoch_state.clone(),
subscribable_events: Planned::ready(vec![]),
})
todo!() // FIXME(aldenhu)
/*
Self::new_impl(Inner {
is_block: false,
first_version: self.next_version(),
statuses_for_input_txns: vec![],
to_commit: TransactionsToKeep::new_empty(),
to_discard: TransactionsWithOutput::new_empty(),
to_retry: TransactionsWithOutput::new_empty(),
state_cache: StateCache::new_dummy(),
block_end_info: None,
next_epoch_state: self.next_epoch_state.clone(),
subscribable_events: Planned::ready(vec![]),
})
*/
}

fn new_impl(inner: Inner) -> Self {
Expand All @@ -134,6 +152,7 @@ impl ExecutionOutput {
#[derive(Debug)]
pub struct Inner {
pub is_block: bool,
// FIXME(aldenhu): redundant
pub first_version: Version,
// Statuses of the input transactions, in the same order as the input transactions.
// Contains BlockMetadata/Validator transactions,
Expand All @@ -144,6 +163,7 @@ pub struct Inner {
pub to_discard: TransactionsWithOutput,
pub to_retry: TransactionsWithOutput,

/// FIXME(aldenhu): is it useful now that we have the InMemState calculated already?
/// Carries the frozen base state view, so all in-mem nodes involved won't drop before the
/// execution result is processed; as well as all the accounts touched during execution, together
/// with their proofs.
Expand All @@ -155,6 +175,12 @@ pub struct Inner {
/// state cache.
pub next_epoch_state: Option<EpochState>,
pub subscribable_events: Planned<Vec<ContractEvent>>,

pub parent_state: InMemState,
/// n.b. For state sync chunks, it's possible that there's 0 to multiple state checkpoints in a
/// chunk, while for consensus the last transaction should always be a state checkpoint.
pub last_checkpoint_state: Option<InMemState>,
pub result_state: InMemState,
}

impl Inner {
Expand Down
30 changes: 17 additions & 13 deletions execution/executor-types/src/state_checkpoint_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

use aptos_crypto::HashValue;
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::state_delta::StateDelta;
use aptos_types::state_store::ShardedStateUpdates;
use aptos_storage_interface::{state_authenticator::StateAuthenticator, state_delta::StateDelta};
use derive_more::Deref;
use std::sync::Arc;

Expand All @@ -18,26 +17,30 @@ pub struct StateCheckpointOutput {

impl StateCheckpointOutput {
pub fn new(
parent_state: Arc<StateDelta>,
result_state: Arc<StateDelta>,
state_updates_before_last_checkpoint: Option<ShardedStateUpdates>,
parent_auth: StateAuthenticator,
last_checkpoint_auth: Option<StateAuthenticator>,
state_auth: StateAuthenticator,
state_checkpoint_hashes: Vec<Option<HashValue>>,
) -> Self {
Self::new_impl(Inner {
parent_state,
result_state,
state_updates_before_last_checkpoint,
parent_auth,
last_checkpoint_auth,
state_auth,
state_checkpoint_hashes,
})
}

pub fn new_empty(state: Arc<StateDelta>) -> Self {
/*
Self::new_impl(Inner {
parent_state: state.clone(),
result_state: state,
state_authenticator: state,
state_updates_before_last_checkpoint: None,
state_checkpoint_hashes: vec![],
})
*/
todo!() // FIXME(aldenhu)
}

pub fn new_dummy() -> Self {
Expand All @@ -51,14 +54,15 @@ impl StateCheckpointOutput {
}

pub fn reconfig_suffix(&self) -> Self {
Self::new_empty(self.result_state.clone())
Self::new_empty(self.state_auth.clone())
}
}

#[derive(Debug, Default)]
pub struct Inner {
pub parent_state: Arc<StateDelta>,
pub result_state: Arc<StateDelta>,
pub state_updates_before_last_checkpoint: Option<ShardedStateUpdates>,
/// FIXME(aldenhu): see if it's useful
pub parent_auth: StateAuthenticator,
pub last_checkpoint_auth: Option<StateAuthenticator>,
pub state_auth: StateAuthenticator,
pub state_checkpoint_hashes: Vec<Option<HashValue>>,
}
6 changes: 1 addition & 5 deletions execution/executor-types/src/state_compute_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,8 @@ impl StateComputeResult {
transaction_outputs: &self.execution_output.to_commit.transaction_outputs,
transaction_infos: &self.ledger_update_output.transaction_infos,
base_state_version: self.state_checkpoint_output.parent_state.base_version,
latest_in_memory_state: &self.state_checkpoint_output.result_state,
latest_in_memory_state: &self.state_checkpoint_output.state_auth,
state_update_refs: self.execution_output.to_commit.state_update_refs(),
state_updates_until_last_checkpoint: self
.state_checkpoint_output
.state_updates_before_last_checkpoint
.as_ref(),
sharded_state_cache: Some(&self.execution_output.state_cache.sharded_state_cache),
is_reconfig: self.execution_output.next_epoch_state.is_some(),
}
Expand Down
1 change: 1 addition & 0 deletions execution/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ aptos-drop-helper = { workspace = true }
aptos-executor-service = { workspace = true }
aptos-executor-types = { workspace = true }
aptos-experimental-runtimes = { workspace = true }
aptos-experimental-layered-map = { workspace = true }
aptos-indexer-grpc-table-info = { workspace = true }
aptos-infallible = { workspace = true }
aptos-logger = { workspace = true }
Expand Down
63 changes: 39 additions & 24 deletions execution/executor/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
use anyhow::Result;
use aptos_crypto::HashValue;
use aptos_executor_types::{
execution_output::ExecutionOutput, state_checkpoint_output,
state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorError, ExecutorResult,
};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
Expand All @@ -39,6 +40,7 @@ use aptos_types::{
};
use aptos_vm::VMBlockExecutor;
use block_tree::BlockTree;
use bytes::Buf;
use fail::fail_point;
use std::sync::Arc;

Expand Down Expand Up @@ -103,7 +105,7 @@ where
.read()
.as_ref()
.expect("BlockExecutor is not reset")
.execute_and_state_checkpoint(block, parent_block_id, onchain_config)
.execute_block(block, parent_block_id, onchain_config)
}

fn ledger_update(
Expand Down Expand Up @@ -176,7 +178,7 @@ where
self.block_tree.root_block().id
}

fn execute_and_state_checkpoint(
fn execute_block(
&self,
block: ExecutableBlock,
parent_block_id: HashValue,
Expand All @@ -201,7 +203,7 @@ where
"execute_block"
);
let committed_block_id = self.committed_block_id();
let (execution_output, state_checkpoint_output) =
let execution_output =
if parent_block_id != committed_block_id && parent_output.has_reconfiguration() {
// ignore reconfiguration suffix, even if the block is non-empty
info!(
Expand All @@ -220,10 +222,8 @@ where

CachedStateView::new(
StateViewId::BlockExecution { block_id },
Arc::clone(&self.db.reader),
parent_output.execution_output.next_version(),
parent_output.expect_result_state().current.clone(),
Arc::new(AsyncProofFetcher::new(self.db.reader.clone())),
parent_output.execution_output.result_state.clone(),
self.db.clone().reader().clone(),
)?
};

Expand Down Expand Up @@ -259,7 +259,6 @@ where
(execution_output, state_checkpoint_output)
};
let output = PartialStateComputeResult::new(execution_output);
output.set_state_checkpoint_output(state_checkpoint_output);

let _ = self
.block_tree
Expand Down Expand Up @@ -288,33 +287,49 @@ where
// At this point of time two things must happen
// 1. The block tree must also have the current block id with or without the ledger update output.
// 2. We must have the ledger update output of the parent block.
let parent_output = parent_block.output.expect_ledger_update_output();
let parent_accumulator = parent_output.txn_accumulator();
let parent_output = &parent_block.output;
let block = block_vec.pop().expect("Must exist").unwrap();
let output = &block.output;
parent_block.ensure_has_child(block_id)?;
if let Some(complete_result) = block.output.get_complete_result() {
return Ok(complete_result);
}

let output =
if parent_block_id != committed_block_id && parent_block.output.has_reconfiguration() {
info!(
LogSchema::new(LogEntry::BlockExecutor).block_id(block_id),
"reconfig_descendant_block_received"
);
parent_output.reconfig_suffix()
} else {
THREAD_MANAGER.get_non_exe_cpu_pool().install(|| {
if parent_block_id != committed_block_id && parent_output.has_reconfiguration() {
info!(
LogSchema::new(LogEntry::BlockExecutor).block_id(block_id),
"reconfig_descendant_block_received"
);
output.set_state_checkpoint_output(
parent_output
.expect_state_checkpoint_output()
.reconfig_suffix(),
);
output.set_ledger_update_output(
parent_output
.expect_ledger_update_output()
.reconfig_suffix(),
);
} else {
output.set_state_checkpoint_output(DoStateCheckpoint::run(
&parent_output.execution_output,
&parent_output.expect_state_checkpoint_output().state_auth,
None, // known_state_checkpoints
)?);

output.set_ledger_update_output(THREAD_MANAGER.get_non_exe_cpu_pool().install(
|| {
DoLedgerUpdate::run(
&output.execution_output,
output.expect_state_checkpoint_output(),
parent_accumulator.clone(),
parent_output
.expect_ledger_update_output()
.transaction_accumulator
.clone(),
)
})?
};

block.output.set_ledger_update_output(output);
},
)?);
};

Ok(block.output.expect_complete_result())
}
Expand Down
Loading

0 comments on commit c88743f

Please sign in to comment.