Skip to content

Commit

Permalink
fix(getter): redundancy getter cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Mar 8, 2024
1 parent ff4ecc6 commit b83834a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 74 deletions.
3 changes: 0 additions & 3 deletions pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ type UpgradedResponseWriter interface {
http.Pusher
http.Hijacker
http.Flusher
// staticcheck SA1019 CloseNotifier interface is required by gorilla compress handler
// nolint:staticcheck
http.CloseNotifier
}

type responseWriter struct {
Expand Down
116 changes: 48 additions & 68 deletions pkg/file/redundancy/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package getter
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"

Expand All @@ -26,42 +25,33 @@ var (
// if retrieves children of an intermediate chunk potentially using erasure decoding
// it caches sibling chunks if erasure decoding started already
type decoder struct {
ctx context.Context
fetcher storage.Getter // network retrieval interface to fetch chunks
putter storage.Putter // interface to local storage to save reconstructed chunks
addrs []swarm.Address // all addresses of the intermediate chunk
inflight []atomic.Bool // locks to protect wait channels and RS buffer
cache map[string]int // map from chunk address shard position index
waits []chan error // wait channels for each chunk
rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding
goodRecovery chan struct{} // signal channel for successful retrieval of shardCnt chunks
badRecovery chan struct{} // signals that either the recovery has failed or not allowed to run
lastLen int // length of the last data chunk in the RS buffer
shardCnt int // number of data shards
parityCnt int // number of parity shards
wg sync.WaitGroup // wait group to wait for all goroutines to finish
mu sync.Mutex // mutex to protect buffer
fetchedCnt atomic.Int32 // count successful retrievals
failedCnt atomic.Int32 // count successful retrievals
cancel func() // cancel function for RS decoding
remove func(error) // callback to remove decoder from decoders cache
config Config // configuration
logger log.Logger
}

type Getter interface {
storage.Getter
io.Closer
fetcher storage.Getter // network retrieval interface to fetch chunks
putter storage.Putter // interface to local storage to save reconstructed chunks
addrs []swarm.Address // all addresses of the intermediate chunk
inflight []atomic.Bool // locks to protect wait channels and RS buffer
cache map[string]int // map from chunk address shard position index
waits []chan error // wait channels for each chunk
rsbuf [][]byte // RS buffer of data + parity shards for erasure decoding
goodRecovery chan struct{} // signal channel for successful retrieval of shardCnt chunks
badRecovery chan struct{} // signals that either the recovery has failed or not allowed to run
lastLen int // length of the last data chunk in the RS buffer
shardCnt int // number of data shards
parityCnt int // number of parity shards
wg sync.WaitGroup // wait group to wait for all goroutines to finish
mu sync.Mutex // mutex to protect buffer
fetchedCnt atomic.Int32 // count successful retrievals
failedCnt atomic.Int32 // count successful retrievals
remove func(error) // callback to remove decoder from decoders cache
config Config // configuration
prefetchTriggered bool // indicates that prefetch has been triggerd
logger log.Logger
}

// New returns a decoder object used to retrieve children of an intermediate chunk
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) Getter {
// global context is canceled when the Close is called or when the prefetch terminates
ctx, cancel := context.WithCancel(context.Background())
func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter, remove func(error), conf Config) storage.Getter {
size := len(addrs)

d := &decoder{
ctx: ctx,
fetcher: g,
putter: p,
addrs: addrs,
Expand All @@ -71,7 +61,6 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter
rsbuf: make([][]byte, size),
goodRecovery: make(chan struct{}),
badRecovery: make(chan struct{}),
cancel: cancel,
remove: remove,
shardCnt: shardCnt,
parityCnt: size - shardCnt,
Expand All @@ -91,13 +80,12 @@ func New(addrs []swarm.Address, shardCnt int, g storage.Getter, p storage.Putter

// prefetch chunks according to strategy
if !conf.Strict || conf.Strategy != NONE {
d.prefetchTriggered = true
d.wg.Add(1)
go func() {
defer d.wg.Done()
_ = d.prefetch(ctx)
d.prefetch()
}()
} else { // recovery not allowed
close(d.badRecovery)
}

return d
Expand All @@ -110,7 +98,7 @@ func (g *decoder) Get(ctx context.Context, addr swarm.Address) (swarm.Chunk, err
if !ok {
return nil, storage.ErrNotFound
}
err := g.fetch(ctx, i, true)
err := g.fetch(ctx, i, g.prefetchTriggered)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,15 +132,16 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
fctx, cancel := context.WithTimeout(ctx, g.config.FetchTimeout)
defer cancel()

g.wg.Add(1)
go func() {
select {
case <-fctx.Done(): // local context
case <-g.ctx.Done(): // global context
}
cancel()
g.wg.Done()
}()
// when the already running the recovery process terminates, any inflight requests can be canceled.
// we do the extra bool check to not fire an unnecessary goroutine
if waitForRecovery {
g.wg.Add(1)
go func() {
defer g.wg.Done()
defer cancel()
_ = waitRecovery(nil)
}()
}

// retrieval
ch, err := g.fetcher.Get(fctx, g.addrs[i])
Expand Down Expand Up @@ -181,9 +170,18 @@ func (g *decoder) fetch(ctx context.Context, i int, waitForRecovery bool) (err e
return waitRecovery(storage.ErrNotFound)
}

func (g *decoder) prefetch(ctx context.Context) (err error) {
defer g.remove(err)
defer g.cancel()
func (g *decoder) prefetch() {

var err error
defer func() {
if err != nil {
close(g.badRecovery)
}
g.remove(err)
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

run := func(s Strategy) error {
if err := g.runStrategy(ctx, s); err != nil {
Expand All @@ -194,7 +192,6 @@ func (g *decoder) prefetch(ctx context.Context) (err error) {
}

for s := g.config.Strategy; s < strategyCnt; s++ {

err = run(s)
if err != nil {
if s == DATA || s == RACE {
Expand All @@ -206,19 +203,12 @@ func (g *decoder) prefetch(ctx context.Context) (err error) {
g.logger.Debug("successful recovery", "strategy", s)
}
close(g.goodRecovery)
break
return
}
if g.config.Strict { // only run one strategy
break
return
}
}

if err != nil {
close(g.badRecovery)
return err
}

return err
}

func (g *decoder) runStrategy(ctx context.Context, s Strategy) error {
Expand Down Expand Up @@ -379,13 +369,3 @@ func (g *decoder) save(ctx context.Context, missing []int) error {
}
return nil
}

// Close terminates the prefetch loop, waits for all goroutines to finish and
// removes the decoder from the cache
// it implements the io.Closer interface
func (g *decoder) Close() error {
g.cancel()
g.wg.Wait()
g.remove(nil)
return nil
}
3 changes: 0 additions & 3 deletions pkg/file/redundancy/getter/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
inmem "github.com/ethersphere/bee/pkg/storage/inmemchunkstore"
mockstorer "github.com/ethersphere/bee/pkg/storer/mock"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/util/testutil"
"github.com/klauspost/reedsolomon"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -112,7 +111,6 @@ func testDecodingRACE(t *testing.T, bufSize, shardCnt, erasureCnt int) {
}

g := getter.New(addrs, shardCnt, store, store, func(error) {}, getter.DefaultConfig)
testutil.CleanupCloser(t, g)

parityCnt := len(buf) - shardCnt
_, err := g.Get(context.Background(), addr)
Expand Down Expand Up @@ -176,7 +174,6 @@ func testDecodingFallback(t *testing.T, s getter.Strategy, strict bool) {
FetchTimeout: strategyTimeout / 2,
}
g := getter.New(addrs, shardCnt, store, store, func(error) {}, conf)
defer g.Close()

// launch delayed and erased chunk retrieval
wg := sync.WaitGroup{}
Expand Down

0 comments on commit b83834a

Please sign in to comment.