Skip to content

Commit

Permalink
[R4R]: Redesign triePrefetcher to make it thread safe (#972)
Browse files Browse the repository at this point in the history
* Redesign triePrefetcher to make it thread safe

There are 2 types of triePrefetcher instances:
1.New created triePrefetcher: it is key to do trie prefetch to speed up validation phase.
2.Copied triePrefetcher: it only copy the prefetched trie information, actually it won't do
  prefetch at all, the copied tries are all kept in p.fetches.

Here we try to improve the new created one, to make it concurrent safe, while the copied one's
behavior stay unchanged(its logic is very simple).
As commented in triePrefetcher struct, its APIs are not thread safe. So callers should make sure
the created triePrefetcher should be used within a single routine.
As we are trying to improve triePrefetcher, we would use it concurrently, so it is necessary to
redesign it for concurrent access.

The design is simple:
** start a mainLoop to do all the work, APIs just send channel message.

Others:
** remove the metrics copy, since it is useless for copied triePrefetcher
** for trie(), only get subfetcher through channel to reduce the workload of mainloop

* some code enhancement for triePrefetcher redesign

* some fixup: rename, temporary trie chan for concurrent safe.

* fix review comments

* add some protection in case the trie prefetcher is already stopped

* fix review comments

** make close concurrent safe
** fix potential deadlock

* replace channel by RWMutex for a few triePrefetcher APIs

For APIs like: trie(), copy(), used(), it is simpler and more efficient to
use a RWMutex instead of channel communicaton.
Since the mainLoop would be busy handling trie request, while these trie request
can be processed in parallism.

We would only keep prefetch and close within the mainLoop, since they could update
the fetchers

* add lock for subfecter.used access to make it concurrent safe

* no need to create channel for copied triePrefetcher

* fix trie_prefetcher_test.go

trie prefetcher’s behavior has changed, prefetch() won't create subfetcher immediately.
it is reasonable, but break the UT, to fix the failed UT
  • Loading branch information
setunapo authored Jul 7, 2022
1 parent 552e404 commit 8e74562
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 84 deletions.
232 changes: 156 additions & 76 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,29 @@ package state

import (
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

const abortChanSize = 64
const (
abortChanSize = 64
concurrentChanSize = 10
)

var (
// triePrefetchMetricsPrefix is the prefix under which to publis the metrics.
triePrefetchMetricsPrefix = "trie/prefetch/"
)

type prefetchMsg struct {
root common.Hash
accountHash common.Hash
keys [][]byte
}

// triePrefetcher is an active prefetcher, which receives accounts or storage
// items and does trie-loading of them. The goal is to get as much useful content
// into the caches as possible.
Expand All @@ -42,8 +52,14 @@ type triePrefetcher struct {
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie

abortChan chan *subfetcher
closeChan chan struct{}
closed int32
closeMainChan chan struct{} // it is to inform the mainLoop
closeMainDoneChan chan struct{}
fetchersMutex sync.RWMutex
prefetchChan chan *prefetchMsg // no need to wait for return

abortChan chan *subfetcher
closeAbortChan chan struct{} // it is used to inform abortLoop

deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
Expand All @@ -60,11 +76,15 @@ type triePrefetcher struct {
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
closeChan: make(chan struct{}),
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
closeAbortChan: make(chan struct{}),

closeMainChan: make(chan struct{}),
closeMainDoneChan: make(chan struct{}),
prefetchChan: make(chan *prefetchMsg, concurrentChanSize),

deliveryMissMeter: metrics.GetOrRegisterMeter(prefix+"/deliverymiss", nil),
accountLoadMeter: metrics.GetOrRegisterMeter(prefix+"/account/load", nil),
Expand All @@ -77,20 +97,62 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),
}
go p.abortLoop()
go p.mainLoop()
return p
}

func (p *triePrefetcher) abortLoop() {
func (p *triePrefetcher) mainLoop() {
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.closeChan:
// drain fetcher channel
case pMsg := <-p.prefetchChan:
fetcher := p.fetchers[pMsg.root]
if fetcher == nil {
fetcher = newSubfetcher(p.db, pMsg.root, pMsg.accountHash)
p.fetchersMutex.Lock()
p.fetchers[pMsg.root] = fetcher
p.fetchersMutex.Unlock()
}
fetcher.schedule(pMsg.keys)

case <-p.closeMainChan:
for _, fetcher := range p.fetchers {
p.abortChan <- fetcher // safe to do multiple times
<-fetcher.term
if metrics.EnabledExpensive {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
fetcher.lock.Lock()
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
fetcher.lock.Unlock()
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))

fetcher.lock.Lock()
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
fetcher.lock.Unlock()
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
}
close(p.closeAbortChan)
close(p.closeMainDoneChan)
p.fetchersMutex.Lock()
p.fetchers = nil
p.fetchersMutex.Unlock()

// drain all the channels before quit the loop
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.prefetchChan:
default:
return
}
Expand All @@ -99,73 +161,74 @@ func (p *triePrefetcher) abortLoop() {
}
}

func (p *triePrefetcher) abortLoop() {
for {
select {
case fetcher := <-p.abortChan:
fetcher.abort()
case <-p.closeAbortChan:
return
}
}
}

// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
for _, fetcher := range p.fetchers {
p.abortChan <- fetcher // safe to do multiple times
<-fetcher.term
if metrics.EnabledExpensive {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))

for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))
}
}
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
}
if atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
close(p.closeMainChan)
<-p.closeMainDoneChan // wait until all subfetcher are stopped
}
close(p.closeChan)
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}

// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
// already loaded will be copied over, but no goroutines will be started. This
// is mostly used in the miner which creates a copy of it's actively mutated
// state to be sealed while it may further mutate the state.
func (p *triePrefetcher) copy() *triePrefetcher {
copy := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie), // Active prefetchers use the fetches map

abortChan: make(chan *subfetcher),
closeChan: make(chan struct{}),
deliveryMissMeter: p.deliveryMissMeter,
accountLoadMeter: p.accountLoadMeter,
accountDupMeter: p.accountDupMeter,
accountSkipMeter: p.accountSkipMeter,
accountWasteMeter: p.accountWasteMeter,
storageLoadMeter: p.storageLoadMeter,
storageDupMeter: p.storageDupMeter,
storageSkipMeter: p.storageSkipMeter,
storageWasteMeter: p.storageWasteMeter,
}
// If the prefetcher is already a copy, duplicate the data
if p.fetches != nil {
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie, len(p.fetches)),
}
// p.fetches is safe to be accessed outside of mainloop
// if the triePrefetcher is active, fetches will not be used in mainLoop
// otherwise, inactive triePrefetcher is readonly, it won't modify fetches
for root, fetch := range p.fetches {
copy.fetches[root] = p.db.CopyTrie(fetch)
fetcherCopied.fetches[root] = p.db.CopyTrie(fetch)
}
return copy
return fetcherCopied
}
// Otherwise we're copying an active fetcher, retrieve the current states
for root, fetcher := range p.fetchers {
copy.fetches[root] = fetcher.peek()

select {
case <-p.closeMainChan:
// for closed trie prefetcher, the fetches should not be nil
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie),
}
return fetcherCopied
default:
p.fetchersMutex.RLock()
fetcherCopied := &triePrefetcher{
db: p.db,
root: p.root,
fetches: make(map[common.Hash]Trie, len(p.fetchers)),
}
// we're copying an active fetcher, retrieve the current states
for root, fetcher := range p.fetchers {
fetcherCopied.fetches[root] = fetcher.peek()
}
p.fetchersMutex.RUnlock()
return fetcherCopied
}
return copy
}

// prefetch schedules a batch of trie items to prefetch.
Expand All @@ -174,13 +237,10 @@ func (p *triePrefetcher) prefetch(root common.Hash, keys [][]byte, accountHash c
if p.fetches != nil {
return
}
// Active fetcher, schedule the retrievals
fetcher := p.fetchers[root]
if fetcher == nil {
fetcher = newSubfetcher(p.db, root, accountHash)
p.fetchers[root] = fetcher
select {
case <-p.closeMainChan: // skip closed trie prefetcher
case p.prefetchChan <- &prefetchMsg{root, accountHash, keys}:
}
fetcher.schedule(keys)
}

// trie returns the trie matching the root hash, or nil if the prefetcher doesn't
Expand All @@ -190,20 +250,25 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
if p.fetches != nil {
trie := p.fetches[root]
if trie == nil {
p.deliveryMissMeter.Mark(1)
return nil
}
return p.db.CopyTrie(trie)
}
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root

p.fetchersMutex.RLock()
fetcher := p.fetchers[root]
p.fetchersMutex.RUnlock()
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
return nil
}

// Interrupt the prefetcher if it's by any chance still running and return
// a copy of any pre-loaded trie.
p.abortChan <- fetcher // safe to do multiple times
select {
case <-p.closeAbortChan:
case p.abortChan <- fetcher: // safe to do multiple times
}

trie := fetcher.peek()
if trie == nil {
Expand All @@ -216,8 +281,23 @@ func (p *triePrefetcher) trie(root common.Hash) Trie {
// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the prefetcher is.
func (p *triePrefetcher) used(root common.Hash, used [][]byte) {
if fetcher := p.fetchers[root]; fetcher != nil {
fetcher.used = used
if !metrics.EnabledExpensive {
return
}
// If the prefetcher is an inactive one, bail out
if p.fetches != nil {
return
}
select {
case <-p.closeMainChan:
default:
p.fetchersMutex.RLock()
if fetcher := p.fetchers[root]; fetcher != nil {
fetcher.lock.Lock()
fetcher.used = used
fetcher.lock.Unlock()
}
p.fetchersMutex.RUnlock()
}
}

Expand Down
26 changes: 18 additions & 8 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,33 @@ func filledStateDB() *StateDB {
return state
}

func prefetchGuaranteed(prefetcher *triePrefetcher, root common.Hash, keys [][]byte, accountHash common.Hash) {
prefetcher.prefetch(root, keys, accountHash)
for {
if len(prefetcher.prefetchChan) == 0 {
return
}
time.Sleep(1 * time.Millisecond)
}
}

func TestCopyAndClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
time.Sleep(1 * time.Second)
a := prefetcher.trie(db.originalRoot)
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
b := prefetcher.trie(db.originalRoot)
cpy := prefetcher.copy()
cpy.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
cpy.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(cpy, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(cpy, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
c := cpy.trie(db.originalRoot)
prefetcher.close()
cpy2 := cpy.copy()
cpy2.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(cpy2, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
d := cpy2.trie(db.originalRoot)
cpy.close()
cpy2.close()
Expand All @@ -72,7 +82,7 @@ func TestUseAfterClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
a := prefetcher.trie(db.originalRoot)
prefetcher.close()
b := prefetcher.trie(db.originalRoot)
Expand All @@ -88,7 +98,7 @@ func TestCopyClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
skey := common.HexToHash("aaa")
prefetcher.prefetch(db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
cpy := prefetcher.copy()
a := prefetcher.trie(db.originalRoot)
b := cpy.trie(db.originalRoot)
Expand Down

0 comments on commit 8e74562

Please sign in to comment.