From a2d5f63961a78eb07de11ea4f19265c0c3c4562a Mon Sep 17 00:00:00 2001 From: Leon <316032931@qq.com> Date: Fri, 29 Jul 2022 14:21:37 +0800 Subject: [PATCH] [R4R] Pipecommit enable trie prefetcher (#992) --- core/state/snapshot/difflayer.go | 13 ++------ core/state/snapshot/disklayer.go | 4 --- core/state/snapshot/snapshot.go | 14 +++------ core/state/state_object.go | 12 +------- core/state/statedb.go | 32 ++++++++++---------- core/state/trie_prefetcher.go | 48 +++++++++++++++++++++++------- core/state/trie_prefetcher_test.go | 6 ++-- 7 files changed, 64 insertions(+), 65 deletions(-) diff --git a/core/state/snapshot/difflayer.go b/core/state/snapshot/difflayer.go index d2b1b2778b..ca20b3ea4b 100644 --- a/core/state/snapshot/difflayer.go +++ b/core/state/snapshot/difflayer.go @@ -118,9 +118,8 @@ type diffLayer struct { storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted) - verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed - valid bool // mark the difflayer is valid or not. - accountCorrected bool // mark the accountData has been corrected ort not + verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed + valid bool // mark the difflayer is valid or not. diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer @@ -294,14 +293,6 @@ func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) { defer dl.lock.Unlock() dl.accountData = accounts - dl.accountCorrected = true -} - -func (dl *diffLayer) AccountsCorrected() bool { - dl.lock.RLock() - defer dl.lock.RUnlock() - - return dl.accountCorrected } // Parent returns the subsequent layer of a diff layer. diff --git a/core/state/snapshot/disklayer.go b/core/state/snapshot/disklayer.go index 6d46496a71..ca4a1051aa 100644 --- a/core/state/snapshot/disklayer.go +++ b/core/state/snapshot/disklayer.go @@ -62,10 +62,6 @@ func (dl *diskLayer) Verified() bool { func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) { } -func (dl *diskLayer) AccountsCorrected() bool { - return true -} - // Parent always returns nil as there's no layer below the disk. func (dl *diskLayer) Parent() snapshot { return nil diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 967fc0c68c..2f13631607 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -113,9 +113,6 @@ type Snapshot interface { // CorrectAccounts updates account data for storing the correct data during pipecommit CorrectAccounts(map[common.Hash][]byte) - // AccountsCorrected checks whether the account data has been corrected during pipecommit - AccountsCorrected() bool - // Account directly retrieves the account associated with a particular hash in // the snapshot slim data format. Account(hash common.Hash) (*Account, error) @@ -131,6 +128,10 @@ type Snapshot interface { // Storage directly retrieves the storage data associated with a particular hash, // within a particular account. Storage(accountHash, storageHash common.Hash) ([]byte, error) + + // Parent returns the subsequent layer of a snapshot, or nil if the base was + // reached. + Parent() snapshot } // snapshot is the internal version of the snapshot data layer that supports some @@ -138,13 +139,6 @@ type Snapshot interface { type snapshot interface { Snapshot - // Parent returns the subsequent layer of a snapshot, or nil if the base was - // reached. - // - // Note, the method is an internal helper to avoid type switching between the - // disk and diff layers. There is no locking involved. - Parent() snapshot - // Update creates a new layer on top of the existing snapshot diff tree with // the specified data items. // diff --git a/core/state/state_object.go b/core/state/state_object.go index bd6192c964..6d7ce60b37 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -351,18 +351,8 @@ func (s *StateObject) finalise(prefetch bool) { } } - // The account root need to be updated before prefetch, otherwise the account root is empty - if s.db.pipeCommit && s.data.Root == dummyRoot && !s.rootCorrected && s.db.snap.AccountsCorrected() { - if acc, err := s.db.snap.Account(crypto.HashData(s.db.hasher, s.address.Bytes())); err == nil { - if acc != nil && len(acc.Root) != 0 { - s.data.Root = common.BytesToHash(acc.Root) - s.rootCorrected = true - } - } - } - prefetcher := s.db.prefetcher - if prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot && s.data.Root != dummyRoot { + if prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot { prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash) } if len(s.dirtyStorage) > 0 { diff --git a/core/state/statedb.go b/core/state/statedb.go index ec3d80e7f1..cb36d34d54 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -50,10 +50,6 @@ var ( // emptyRoot is the known root hash of an empty trie. emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") - // dummyRoot is the dummy account root before corrected in pipecommit sync mode, - // the value is 542e5fc2709de84248e9bce43a9c0c8943a608029001360f8ab55bf113b23d28 - dummyRoot = crypto.Keccak256Hash([]byte("dummy_account_root")) - emptyAddr = crypto.Keccak256Hash(common.Address{}.Bytes()) ) @@ -218,7 +214,12 @@ func (s *StateDB) StartPrefetcher(namespace string) { s.prefetcher = nil } if s.snap != nil { - s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace) + parent := s.snap.Parent() + if parent != nil { + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, parent.Root(), namespace) + } else { + s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, common.Hash{}, namespace) + } } } @@ -1000,7 +1001,11 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) { } prefetcher := s.prefetcher if prefetcher != nil && len(addressesToPrefetch) > 0 { - prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr) + if s.snap.Verified() { + prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr) + } else if prefetcher.rootParent != (common.Hash{}) { + prefetcher.prefetch(prefetcher.rootParent, addressesToPrefetch, emptyAddr) + } } // Invalidate journal because reverting across transactions is not allowed. s.clearJournalAndRefund() @@ -1035,11 +1040,12 @@ func (s *StateDB) CorrectAccountsRoot(blockRoot common.Hash) { } if accounts, err := snapshot.Accounts(); err == nil && accounts != nil { for _, obj := range s.stateObjects { - if !obj.deleted && !obj.rootCorrected && obj.data.Root == dummyRoot { + if !obj.deleted { if account, exist := accounts[crypto.Keccak256Hash(obj.address[:])]; exist { - obj.data.Root = common.BytesToHash(account.Root) - if obj.data.Root == (common.Hash{}) { + if len(account.Root) == 0 { obj.data.Root = emptyRoot + } else { + obj.data.Root = common.BytesToHash(account.Root) } obj.rootCorrected = true } @@ -1053,12 +1059,8 @@ func (s *StateDB) PopulateSnapAccountAndStorage() { for addr := range s.stateObjectsPending { if obj := s.stateObjects[addr]; !obj.deleted { if s.snap != nil { - root := obj.data.Root - storageChanged := s.populateSnapStorage(obj) - if storageChanged { - root = dummyRoot - } - s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, root, obj.data.CodeHash) + s.populateSnapStorage(obj) + s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash) } } } diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index dd22f9d084..cd51820e9e 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -49,10 +49,11 @@ type prefetchMsg struct { // // Note, the prefetcher's API is not thread safe. type triePrefetcher struct { - db Database // Database to fetch trie nodes through - root common.Hash // Root hash of theaccount trie for metrics - fetches map[common.Hash]Trie // Partially or fully fetcher tries - fetchers map[common.Hash]*subfetcher // Subfetchers for each trie + db Database // Database to fetch trie nodes through + root common.Hash // Root hash of theaccount trie for metrics + rootParent common.Hash //Root has of the account trie from block before the prvious one, designed for pipecommit mode + fetches map[common.Hash]Trie // Partially or fully fetcher tries + fetchers map[common.Hash]*subfetcher // Subfetchers for each trie abortChan chan *subfetcher // to abort a single subfetcher and its children closed int32 @@ -70,16 +71,22 @@ type triePrefetcher struct { storageDupMeter metrics.Meter storageSkipMeter metrics.Meter storageWasteMeter metrics.Meter + + accountStaleLoadMeter metrics.Meter + accountStaleDupMeter metrics.Meter + accountStaleSkipMeter metrics.Meter + accountStaleWasteMeter metrics.Meter } // newTriePrefetcher -func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher { +func newTriePrefetcher(db Database, root, rootParent common.Hash, namespace string) *triePrefetcher { prefix := triePrefetchMetricsPrefix + namespace p := &triePrefetcher{ - db: db, - root: root, - fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map - abortChan: make(chan *subfetcher, abortChanSize), + db: db, + root: root, + rootParent: rootParent, + fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map + abortChan: make(chan *subfetcher, abortChanSize), closeMainChan: make(chan struct{}), closeMainDoneChan: make(chan struct{}), @@ -94,6 +101,11 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil), storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil), storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil), + + accountStaleLoadMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/load", nil), + accountStaleDupMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/dup", nil), + accountStaleSkipMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/skip", nil), + accountStaleWasteMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/waste", nil), } go p.mainLoop() return p @@ -144,7 +156,8 @@ func (p *triePrefetcher) mainLoop() { } if metrics.EnabledExpensive { - if fetcher.root == p.root { + switch fetcher.root { + case p.root: p.accountLoadMeter.Mark(int64(len(fetcher.seen))) p.accountDupMeter.Mark(int64(fetcher.dups)) p.accountSkipMeter.Mark(int64(len(fetcher.tasks))) @@ -154,7 +167,19 @@ func (p *triePrefetcher) mainLoop() { } fetcher.lock.Unlock() p.accountWasteMeter.Mark(int64(len(fetcher.seen))) - } else { + + case p.rootParent: + p.accountStaleLoadMeter.Mark(int64(len(fetcher.seen))) + p.accountStaleDupMeter.Mark(int64(fetcher.dups)) + p.accountStaleSkipMeter.Mark(int64(len(fetcher.tasks))) + fetcher.lock.Lock() + for _, key := range fetcher.used { + delete(fetcher.seen, string(key)) + } + fetcher.lock.Unlock() + p.accountStaleWasteMeter.Mark(int64(len(fetcher.seen))) + + default: p.storageLoadMeter.Mark(int64(len(fetcher.seen))) p.storageDupMeter.Mark(int64(fetcher.dups)) p.storageSkipMeter.Mark(int64(len(fetcher.tasks))) @@ -165,6 +190,7 @@ func (p *triePrefetcher) mainLoop() { } fetcher.lock.Unlock() p.storageWasteMeter.Mark(int64(len(fetcher.seen))) + } } } diff --git a/core/state/trie_prefetcher_test.go b/core/state/trie_prefetcher_test.go index aa178dc9d0..8d8888fb24 100644 --- a/core/state/trie_prefetcher_test.go +++ b/core/state/trie_prefetcher_test.go @@ -55,7 +55,7 @@ func prefetchGuaranteed(prefetcher *triePrefetcher, root common.Hash, keys [][]b func TestCopyAndClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") + prefetcher := newTriePrefetcher(db.db, db.originalRoot, common.Hash{}, "") skey := common.HexToHash("aaa") prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) @@ -80,7 +80,7 @@ func TestCopyAndClose(t *testing.T) { func TestUseAfterClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") + prefetcher := newTriePrefetcher(db.db, db.originalRoot, common.Hash{}, "") skey := common.HexToHash("aaa") prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) a := prefetcher.trie(db.originalRoot) @@ -96,7 +96,7 @@ func TestUseAfterClose(t *testing.T) { func TestCopyClose(t *testing.T) { db := filledStateDB() - prefetcher := newTriePrefetcher(db.db, db.originalRoot, "") + prefetcher := newTriePrefetcher(db.db, db.originalRoot, common.Hash{}, "") skey := common.HexToHash("aaa") prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{}) cpy := prefetcher.copy()