Skip to content

Commit

Permalink
core, core/rawdb, eth/sync: no tx indexing during snap sync (#28703)
Browse files Browse the repository at this point in the history
This change simplifies the logic for indexing transactions and enhances the UX when transaction is not found by returning more information to users.

Transaction indexing is now considered as a part of the initial sync, and `eth.syncing` will thus be `true` if transaction indexing is not yet finished. API consumers can use the syncing status to determine if the node is ready to serve users.
  • Loading branch information
rjl493456442 authored Jan 22, 2024
1 parent f55a10b commit 78a3c32
Show file tree
Hide file tree
Showing 23 changed files with 446 additions and 365 deletions.
202 changes: 114 additions & 88 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,24 @@ func DefaultCacheConfigWithScheme(scheme string) *CacheConfig {
return &config
}

// txLookup is wrapper over transaction lookup along with the corresponding
// transaction object.
type txLookup struct {
lookup *rawdb.LegacyTxLookupEntry
transaction *types.Transaction
}

// TxIndexProgress is the struct describing the progress for transaction indexing.
type TxIndexProgress struct {
Indexed uint64 // number of blocks whose transactions are indexed
Remaining uint64 // number of blocks whose transactions are not indexed yet
}

// Done returns an indicator if the transaction indexing is finished.
func (prog TxIndexProgress) Done() bool {
return prog.Remaining == 0
}

// BlockChain represents the canonical chain given a database with a genesis
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
//
Expand Down Expand Up @@ -242,15 +260,18 @@ type BlockChain struct {
bodyRLPCache *lru.Cache[common.Hash, rlp.RawValue]
receiptsCache *lru.Cache[common.Hash, []*types.Receipt]
blockCache *lru.Cache[common.Hash, *types.Block]
txLookupCache *lru.Cache[common.Hash, *rawdb.LegacyTxLookupEntry]
txLookupCache *lru.Cache[common.Hash, txLookup]

// future blocks are blocks added for later processing
futureBlocks *lru.Cache[common.Hash, *types.Block]

wg sync.WaitGroup //
quit chan struct{} // shutdown signal, closed in Stop.
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing
wg sync.WaitGroup
quit chan struct{} // shutdown signal, closed in Stop.
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing

txIndexRunning bool // flag if the background tx indexer is activated
txIndexProgCh chan chan TxIndexProgress // chan for querying the progress of transaction indexing

engine consensus.Engine
validator Validator // Block and state validator interface
Expand Down Expand Up @@ -297,8 +318,9 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit),
receiptsCache: lru.NewCache[common.Hash, []*types.Receipt](receiptsCacheLimit),
blockCache: lru.NewCache[common.Hash, *types.Block](blockCacheLimit),
txLookupCache: lru.NewCache[common.Hash, *rawdb.LegacyTxLookupEntry](txLookupCacheLimit),
txLookupCache: lru.NewCache[common.Hash, txLookup](txLookupCacheLimit),
futureBlocks: lru.NewCache[common.Hash, *types.Block](maxFutureBlocks),
txIndexProgCh: make(chan chan TxIndexProgress),
engine: engine,
vmConfig: vmConfig,
}
Expand Down Expand Up @@ -466,6 +488,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Start tx indexer/unindexer if required.
if txLookupLimit != nil {
bc.txLookupLimit = *txLookupLimit
bc.txIndexRunning = true

bc.wg.Add(1)
go bc.maintainTxIndex()
Expand Down Expand Up @@ -1155,14 +1178,13 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Ensure genesis is in ancients.
if first.NumberU64() == 1 {
if frozen, _ := bc.db.Ancients(); frozen == 0 {
b := bc.genesisBlock
td := bc.genesisBlock.Difficulty()
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, td)
size += writeSize
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil}, td)
if err != nil {
log.Error("Error writing genesis to ancients", "err", err)
return 0, err
}
size += writeSize
log.Info("Wrote genesis to ancients")
}
}
Expand All @@ -1176,44 +1198,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
size += writeSize
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
return 0, err
}

// Write tx indices if any condition is satisfied:
// * If user requires to reserve all tx indices(txlookuplimit=0)
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
// * If block number is large enough to be regarded as a recent block
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
//
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
// an external ancient database, during the setup, blockchain will start
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
// range. In this case, all tx indices of newly imported blocks should be
// generated.
batch := bc.db.NewBatch()
for i, block := range blockChain {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
} else if rawdb.ReadTxIndexTail(bc.db) != nil {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
}
stats.processed++

if batch.ValueSize() > ethdb.IdealBatchSize || i == len(blockChain)-1 {
size += int64(batch.ValueSize())
if err = batch.Write(); err != nil {
snapBlock := bc.CurrentSnapBlock().Number.Uint64()
if _, err := bc.db.TruncateHead(snapBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
}
batch.Reset()
}
}
size += writeSize

// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
Expand All @@ -1231,8 +1220,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}

// Delete block data from the main database.
batch.Reset()
canonHashes := make(map[common.Hash]struct{})
var (
batch = bc.db.NewBatch()
canonHashes = make(map[common.Hash]struct{})
)
for _, block := range blockChain {
canonHashes[block.Hash()] = struct{}{}
if block.NumberU64() == 0 {
Expand All @@ -1250,13 +1241,16 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := batch.Write(); err != nil {
return 0, err
}
stats.processed += int32(len(blockChain))
return 0, nil
}

// writeLive writes blockchain and corresponding receipt chain into active store.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
skipPresenceCheck := false
batch := bc.db.NewBatch()
var (
skipPresenceCheck = false
batch = bc.db.NewBatch()
)
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if bc.insertStopped() {
Expand All @@ -1281,11 +1275,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
rawdb.WriteTxLookupEntriesByBlock(batch, block) // Always write tx indices for live blocks, we assume they are needed

// Write everything belongs to the blocks into the database. So that
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
// we can ensure all components of body is completed(body, receipts)
// except transaction indexes(will be created once sync is finished).
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
Expand Down Expand Up @@ -1317,19 +1310,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return n, err
}
}
// Write the tx index tail (block number from where we index) before write any live blocks
if len(liveBlocks) > 0 && liveBlocks[0].NumberU64() == ancientLimit+1 {
// The tx index tail can only be one of the following two options:
// * 0: all ancient blocks have been indexed
// * ancient-limit: the indices of blocks before ancient-limit are ignored
if tail := rawdb.ReadTxIndexTail(bc.db); tail == nil {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit {
rawdb.WriteTxIndexTail(bc.db, 0)
} else {
rawdb.WriteTxIndexTail(bc.db, ancientLimit-bc.txLookupLimit)
}
}
}
if len(liveBlocks) > 0 {
if n, err := writeLive(liveBlocks, liveReceipts); err != nil {
if err == errInsertionInterrupted {
Expand All @@ -1338,13 +1318,14 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return n, err
}
}

head := blockChain[len(blockChain)-1]
context := []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
"size", common.StorageSize(size),
}
var (
head = blockChain[len(blockChain)-1]
context = []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
"number", head.Number(), "hash", head.Hash(), "age", common.PrettyAge(time.Unix(int64(head.Time()), 0)),
"size", common.StorageSize(size),
}
)
if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...)
}
Expand All @@ -1360,7 +1341,6 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e
if bc.insertStopped() {
return errInsertionInterrupted
}

batch := bc.db.NewBatch()
rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td)
rawdb.WriteBlock(batch, block)
Expand Down Expand Up @@ -2427,23 +2407,24 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) {
defer func() { close(done) }()

// If head is 0, it means the chain is just initialized and no blocks are inserted,
// so don't need to indexing anything.
// If head is 0, it means the chain is just initialized and no blocks are
// inserted, so don't need to index anything.
if head == 0 {
return
}

// The tail flag is not existent, it means the node is just initialized
// and all blocks(may from ancient store) are not indexed yet.
// and all blocks in the chain (part of them may from ancient store) are
// not indexed yet, index the chain according to the configuration then.
if tail == nil {
from := uint64(0)
if bc.txLookupLimit != 0 && head >= bc.txLookupLimit {
from = head - bc.txLookupLimit + 1
}
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit)
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit, true)
return
}
// The tail flag is existent, but the whole chain is required to be indexed.
// The tail flag is existent (which means indexes in [tail, head] should be
// present), while the whole chain are requested for indexing.
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
if *tail > 0 {
// It can happen when chain is rewound to a historical point which
Expand All @@ -2453,17 +2434,58 @@ func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{})
if end > head+1 {
end = head + 1
}
rawdb.IndexTransactions(bc.db, 0, end, bc.quit)
rawdb.IndexTransactions(bc.db, 0, end, bc.quit, true)
}
return
}
// Update the transaction index to the new chain state
// The tail flag is existent, adjust the index range according to configuration
// and latest head.
if head-bc.txLookupLimit+1 < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit)
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit, true)
} else {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit)
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit, false)
}
}

// reportTxIndexProgress returns the tx indexing progress.
func (bc *BlockChain) reportTxIndexProgress(head uint64) TxIndexProgress {
var (
remaining uint64
tail = rawdb.ReadTxIndexTail(bc.db)
)
total := bc.txLookupLimit
if bc.txLookupLimit == 0 {
total = head + 1 // genesis included
}
var indexed uint64
if tail != nil {
indexed = head - *tail + 1
}
// The value of indexed might be larger than total if some blocks need
// to be unindexed, avoiding a negative remaining.
if indexed < total {
remaining = total - indexed
}
return TxIndexProgress{
Indexed: indexed,
Remaining: remaining,
}
}

// TxIndexProgress retrieves the tx indexing progress, or an error if the
// background tx indexer is not activated or already stopped.
func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
if !bc.txIndexRunning {
return TxIndexProgress{}, errors.New("tx indexer is not activated")
}
ch := make(chan TxIndexProgress, 1)
select {
case bc.txIndexProgCh <- ch:
return <-ch, nil
case <-bc.quit:
return TxIndexProgress{}, errors.New("blockchain is closed")
}
}

Expand All @@ -2482,8 +2504,9 @@ func (bc *BlockChain) maintainTxIndex() {

// Listening to chain events and manipulate the transaction indexes.
var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
)
sub := bc.SubscribeChainHeadEvent(headCh)
if sub == nil {
Expand All @@ -2492,23 +2515,26 @@ func (bc *BlockChain) maintainTxIndex() {
defer sub.Unsubscribe()
log.Info("Initialized transaction indexer", "limit", bc.TxLookupLimit())

// Launch the initial processing if chain is not empty. This step is
// useful in these scenarios that chain has no progress and indexer
// is never triggered.
if head := rawdb.ReadHeadBlock(bc.db); head != nil {
// Launch the initial processing if chain is not empty (head != genesis).
// This step is useful in these scenarios that chain has no progress and
// indexer is never triggered.
if head := rawdb.ReadHeadBlock(bc.db); head != nil && head.Number().Uint64() != 0 {
done = make(chan struct{})
lastHead = head.Number().Uint64()
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.NumberU64(), done)
}

for {
select {
case head := <-headCh:
if done == nil {
done = make(chan struct{})
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
}
lastHead = head.Block.NumberU64()
case <-done:
done = nil
case ch := <-bc.txIndexProgCh:
ch <- bc.reportTxIndexProgress(lastHead)
case <-bc.quit:
if done != nil {
log.Info("Waiting background transaction indexer to exit")
Expand Down
Loading

0 comments on commit 78a3c32

Please sign in to comment.