Skip to content

Commit

Permalink
perf(tree): add cross-block caching
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected committed Jan 10, 2025
1 parent 1bf8d50 commit cc02864
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 3 deletions.
100 changes: 99 additions & 1 deletion crates/engine/tree/src/tree/cached_state.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
//! Implements a state provider that has a shared cache in front of it.
use alloy_primitives::{map::B256HashMap, Address, StorageKey, StorageValue, B256};
use metrics::Gauge;
use moka::sync::CacheBuilder;
use moka::{sync::CacheBuilder, PredicateError};
use reth_errors::ProviderResult;
use reth_metrics::Metrics;
use reth_primitives::{Account, Bytecode};
use reth_provider::{
AccountReader, BlockHashReader, HashedPostStateProvider, StateProofProvider, StateProvider,
StateRootProvider, StorageRootProvider,
};
use reth_revm::db::BundleState;
use reth_trie::{
updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
Expand Down Expand Up @@ -44,6 +45,74 @@ where
}
}

impl<S> CachedStateProvider<S> {
/// Creates a new [`SavedCache`] from the given state updates, block hash, and account storage
/// roots.
///
/// This does not update the code cache, because no changes are required to the code cache on
/// state change.
///
/// NOTE: Consumers should ensure that these caches are not in use by a state provider for a
/// previous block - otherwise, this update will cause that state provider to contain future
/// state, which would be incorrect.
pub(crate) fn save_cache(
self,
executed_block_hash: B256,
state_updates: &BundleState,
) -> Result<SavedCache, PredicateError> {
let Self { caches, metrics, state_provider: _ } = self;

for (addr, account) in &state_updates.state {
// If the account was not modified, as in not changed and not destroyed, then we have
// nothing to do w.r.t. this particular account and can move on
if !account.status.is_not_modified() {
continue
}

// if the account was destroyed, invalidate from the account / storage caches
if account.was_destroyed() {
// invalidate the account cache entry if destroyed
caches.account_cache.invalidate(addr);

// have to dereference here or else the closure moves the state update's lifetime
// into the closure / out of the method body
let addr = *addr;

// we also do not need to keep track of the returned PredicateId string
caches
.storage_cache
.invalidate_entries_if(move |(account_addr, _), _| addr == *account_addr)?;
continue
}

// if we have an account that was modified, but it has a `None` account info, some wild
// error has occurred because this state should be unrepresentable. An account with
// `None` current info, should be destroyed.
let Some(ref account_info) = account.info else {
todo!("error handling - a modified account has None info")
};

// insert will update if present, so we just use the new account info as the new value
// for the account cache
caches.account_cache.insert(*addr, Some(Account::from(account_info)));

// now we iterate over all storage and make updates to the cached storage values
for (storage_key, slot) in &account.storage {
// we convert the storage key from U256 to B256 because that is how it's represented
// in the cache
caches
.storage_cache
.insert((*addr, (*storage_key).into()), Some(slot.present_value));
}
}

// create a saved cache with the executed block hash, same metrics, and updated caches
let saved_cache = SavedCache { hash: executed_block_hash, caches, metrics };

Ok(saved_cache)
}
}

/// Metrics for the cached state provider, showing hits / misses for each cache
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.caching")]
Expand Down Expand Up @@ -269,7 +338,10 @@ impl ProviderCacheBuilder {
ProviderCaches {
code_cache: CacheBuilder::new(self.code_cache_size)
.build_with_hasher(DefaultHashBuilder::default()),
// we build the storage cache with closure invalidation so we can use
// `invalidate_entries_if` for storage invalidation
storage_cache: CacheBuilder::new(self.storage_cache_size)
.support_invalidation_closures()
.build_with_hasher(DefaultHashBuilder::default()),
account_cache: CacheBuilder::new(self.account_cache_size)
.build_with_hasher(DefaultHashBuilder::default()),
Expand All @@ -286,3 +358,29 @@ impl Default for ProviderCacheBuilder {
Self { code_cache_size: 1000000, storage_cache_size: 1000000, account_cache_size: 1000000 }
}
}

/// A saved cache that has been used for executing a specific block, which has been updated for its
/// execution.
#[derive(Debug)]
pub(crate) struct SavedCache {
/// The hash of the block these caches were used to execute.
hash: B256,

/// The caches used for the provider.
caches: ProviderCaches,

/// Metrics for the cached state provider
metrics: CachedStateMetrics,
}

impl SavedCache {
/// Returns the hash for this cache
pub(crate) const fn executed_block_hash(&self) -> B256 {
self.hash
}

/// Splits the cache into its caches and metrics, consuming it.
pub(crate) fn split(self) -> (ProviderCaches, CachedStateMetrics) {
(self.caches, self.metrics)
}
}
32 changes: 30 additions & 2 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use alloy_rpc_types_engine::{
PayloadValidationError,
};
use block_buffer::BlockBuffer;
use cached_state::SavedCache;
use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
use reth_chain_state::{
CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain,
Expand Down Expand Up @@ -538,6 +539,8 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
/// The engine API variant of this handler
engine_kind: EngineApiKind,
/// The most recent cache used for execution.
most_recent_cache: Option<SavedCache>,
/// state root task thread pool
state_root_task_pool: Arc<rayon::ThreadPool>,
}
Expand Down Expand Up @@ -632,6 +635,7 @@ where
incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook),
engine_kind,
most_recent_cache: None,
state_root_task_pool,
}
}
Expand Down Expand Up @@ -2192,6 +2196,19 @@ where
Ok(None)
}

/// This fetches the most recent saved cache, using the hash of the block we are trying to
/// execute on top of.
///
/// If the hash does not match the saved cache's hash, then the only saved cache doesn't contain
/// state useful for this block's execution, and we return `None`.
///
/// If there is no cache saved, this returns `None`.
///
/// This `take`s the cache, to avoid cloning the entire cache.
fn take_latest_cache(&mut self, parent_hash: B256) -> Option<SavedCache> {
self.most_recent_cache.take_if(|cache| cache.executed_block_hash() == parent_hash)
}

fn insert_block_without_senders(
&mut self,
block: SealedBlockFor<N::Block>,
Expand Down Expand Up @@ -2260,8 +2277,13 @@ where

// Use cached state provider before executing, this does nothing currently, will be used in
// prewarming
let caches = ProviderCacheBuilder::default().build_caches();
let cache_metrics = CachedStateMetrics::zeroed();
let (caches, cache_metrics) =
if let Some(cache) = self.take_latest_cache(block.parent_hash()) {
cache.split()
} else {
(ProviderCacheBuilder::default().build_caches(), CachedStateMetrics::zeroed())
};

let state_provider =
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);

Expand Down Expand Up @@ -2464,6 +2486,12 @@ where
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
debug!(target: "engine::tree", ?root_elapsed, block=?sealed_block.num_hash(), "Calculated state root");

// apply state updates to cache and save it
let Ok(saved_cache) = state_provider.save_cache(sealed_block.hash(), &output.state) else {
todo!("error bubbling for save_cache errors")
};
self.most_recent_cache = Some(saved_cache);

let executed: ExecutedBlock<N> = ExecutedBlock {
block: sealed_block.clone(),
senders: Arc::new(block.senders().to_vec()),
Expand Down

0 comments on commit cc02864

Please sign in to comment.