Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix data race in roundRobinSync #3862

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ go_test(
name = "go_default_test",
srcs = ["round_robin_test.go"],
embed = [":go_default_library"],
race = "on",
tags = ["race_on"],
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
Expand Down
42 changes: 24 additions & 18 deletions beacon-chain/sync/initial-sync/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"io"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -42,13 +42,10 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
counter := ratecounter.NewRateCounter(counterSeconds * time.Second)

var lastEmptyRequests int
errChan := make(chan error)
// Step 1 - Sync to end of finalized epoch.
for s.chain.HeadSlot() < helpers.StartSlot(highestFinalizedEpoch()+1) {
root, finalizedEpoch, peers := bestFinalized()

var blocks []*eth.BeaconBlock

// request a range of blocks to be requested from multiple peers.
// Example:
// - number of peers = 4
Expand All @@ -61,11 +58,14 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
if len(peers) == 0 {
return nil, errors.WithStack(errors.New("no peers left to request blocks"))
}
var wg sync.WaitGroup
var p2pRequestCount int32
errChan := make(chan error)
blocksChan := make(chan []*eth.BeaconBlock)

// Handle block large block ranges of skipped slots.
start += count * uint64(lastEmptyRequests*len(peers))

atomic.AddInt32(&p2pRequestCount, int32(len(peers)))
for i, pid := range peers {
if ctx.Err() != nil {
return nil, ctx.Err()
Expand All @@ -90,10 +90,14 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
Step: step,
}

// Fulfill requests asynchronously, in parallel, and wait for results from all.
wg.Add(1)
go func(i int, pid peer.ID) {
defer wg.Done()
defer func() {
zeroIfIAmTheLast := atomic.AddInt32(&p2pRequestCount, -1)
if zeroIfIAmTheLast == 0 {
close(blocksChan)
}
}()

resp, err := s.requestBlocks(ctx, req, pid)
log.WithField("peer", pid.Pretty()).Debugf("Received %d blocks", len(resp))
if err != nil {
Expand All @@ -110,28 +114,30 @@ func (s *InitialSync) roundRobinSync(genesis time.Time) error {
errChan <- errors.WithStack(errors.New("no peers left to request blocks"))
return
}
_, err = request(start, step, count/uint64(len(ps)) /*count*/, ps, int(count)%len(ps) /*remainder*/)
resp, err = request(start, step, count/uint64(len(ps)) /*count*/, ps, int(count)%len(ps) /*remainder*/)
if err != nil {
errChan <- err
return
}
}
blocks = append(blocks, resp...)
blocksChan <- resp
}(i, pid)
}

// Wait for done signal or any error.
done := make(chan interface{})
go func() {
wg.Wait()
done <- true
}()
var unionRespBlocks []*eth.BeaconBlock
for {
select {
case err := <-errChan:
return nil, err
case <-done:
return blocks, nil
case resp, ok := <-blocksChan:
if ok {
// if this synchronization becomes a bottleneck:
// think about immediately allocating space for all peers in unionRespBlocks,
// and write without synchronization
unionRespBlocks = append(unionRespBlocks, resp...)
} else {
return unionRespBlocks, nil
}
}
}
}
Expand Down