Skip to content

Commit

Permalink
GOCBC-1520: Add concurrency back into the range scan API & implement …
Browse files Browse the repository at this point in the history
…load balancing

Motivation
==========
The RFC requires SDKs that implement concurrency for range scan to distribute scans across nodes in the cluster as evenly as possible.

Changes
=======
* Add rangeScanLoadBalancer that handles the distribution of scans across the nodes
* The rangeScanOpManager uses the rangeScanLoadBalancer to decide which vbucket to scan next
* Add Concurrency to ScanOptions

Results
=======
All tests pass (including FIT)

Change-Id: Ib01398f723ba2def7d460b36d18404066dd51ac6
Reviewed-on: https://review.couchbase.org/c/gocb/+/207078
Tested-by: Build Bot <[email protected]>
Reviewed-by: Charles Dixon <[email protected]>
  • Loading branch information
DemetrisChr committed Apr 4, 2024
1 parent 7fb764b commit cd878d8
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 52 deletions.
8 changes: 8 additions & 0 deletions collection_rangescan.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ type ScanOptions struct {
// Defaults to 50. A value of 0 is equivalent to no limit.
BatchItemLimit *uint32

// Concurrency specifies the maximum number of scans that can be active at the same time.
// Defaults to 1. Care must be taken to ensure that the server does not run out of resources due to concurrent scans.
//
// # UNCOMMITTED
//
// This API is UNCOMMITTED and may change in the future.
Concurrency uint16

// Internal: This should never be used and is not supported.
Internal struct {
User string
Expand Down
18 changes: 17 additions & 1 deletion kvprovider_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (p *kvProviderCore) Scan(c *Collection, scanType ScanType, opts *ScanOption
opm.SetTimeout(opts.Timeout)
opm.SetItemLimit(opts.BatchItemLimit)
opm.SetByteLimit(opts.BatchByteLimit)
opm.SetMaxConcurrency(1)
opm.SetMaxConcurrency(opts.Concurrency)

config, err := p.snapshotProvider.WaitForConfigSnapshot(opts.Context, time.Now().Add(opm.Timeout()))
if err != nil {
Expand All @@ -48,6 +48,22 @@ func (p *kvProviderCore) Scan(c *Collection, scanType ScanType, opts *ScanOption

opm.SetNumVbuckets(numVbuckets)

serverToVbucketMap := make(map[int][]uint16)
numServers, err := config.NumServers()
if err != nil {
opm.Finish()
return nil, err
}
for serverIndex := 0; serverIndex < numServers; serverIndex++ {
vbuckets, err := config.VbucketsOnServer(serverIndex)
if err != nil {
opm.Finish()
return nil, err
}
serverToVbucketMap[serverIndex] = vbuckets
}
opm.SetServerToVbucketMap(serverToVbucketMap)

cid, err := p.getCollectionID(opts.Context, c, opm.TraceSpan(), opm.Timeout(), opm.Impersonate())
if err != nil {
opm.Finish()
Expand Down
2 changes: 2 additions & 0 deletions kvprovider_core_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type coreConfigSnapshot interface {
RevID() int64
NumVbuckets() (int, error)
NumReplicas() (int, error)
NumServers() (int, error)
VbucketsOnServer(index int) ([]uint16, error)
}

type stdCoreConfigSnapshotProvider struct {
Expand Down
171 changes: 128 additions & 43 deletions rangescanopmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"encoding/hex"
"errors"
"io"
"math"
"math/rand"
"sync"
"sync/atomic"
"time"
"unsafe"
Expand Down Expand Up @@ -51,17 +53,23 @@ type rangeScanOpManager struct {
samplingOptions *gocbcore.RangeScanCreateRandomSamplingConfig
vBucketToSnapshotOpts map[uint16]gocbcore.RangeScanCreateSnapshotRequirements

numVbuckets int
keysOnly bool
itemLimit uint32
byteLimit uint32
maxConcurrency uint16
numVbuckets int
serverToVbucketMap map[int][]uint16
keysOnly bool
itemLimit uint32
byteLimit uint32
maxConcurrency uint16

result *ScanResult

cancelled uint32
}

type rangeScanVbucket struct {
id uint16
server int
}

func (p *kvProviderCore) newRangeScanOpManager(c *Collection, scanType ScanType, agent kvProviderCoreProvider,
parentSpan RequestSpan, consistentWith *MutationState, keysOnly bool) (*rangeScanOpManager, error) {
var tracectx RequestSpanContext
Expand Down Expand Up @@ -210,6 +218,10 @@ func (m *rangeScanOpManager) SetNumVbuckets(numVbuckets int) {
m.span.SetAttribute("num_partitions", numVbuckets)
}

func (m *rangeScanOpManager) SetServerToVbucketMap(serverVbucketMap map[int][]uint16) {
m.serverToVbucketMap = serverVbucketMap
}

func (m *rangeScanOpManager) SetTimeout(timeout time.Duration) {
m.timeout = timeout
}
Expand Down Expand Up @@ -312,7 +324,7 @@ func (m *rangeScanOpManager) CheckReadyForOp() error {
m.deadline = time.Now().Add(timeout)

if m.numVbuckets == 0 {
return errors.New("range sacn op manager had no number of partitions specified")
return errors.New("range scan op manager had no number of partitions specified")
}

return nil
Expand Down Expand Up @@ -371,7 +383,7 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) {
}
m.SetResult(r)

vbucketCh := m.createVbucketChannel()
balancer := m.createLoadBalancer()

var complete uint32
var seenData uint32
Expand All @@ -387,32 +399,30 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) {
defer func() {
if atomic.AddUint32(&complete, 1) == uint32(m.maxConcurrency) {
m.Finish()
close(vbucketCh)
balancer.close()
close(resultCh)
}
}()

for vbID := range vbucketCh {
for vbucket, ok := balancer.selectVbucket(); ok; vbucket, ok = balancer.selectVbucket() {
if atomic.LoadUint32(&m.cancelled) == 1 {
return
}

deadline := time.Now().Add(m.Timeout())
failPoint, err := m.scanPartition(ctx, deadline, vbID, resultCh)
failPoint, err := m.scanPartition(ctx, deadline, vbucket.id, resultCh)
balancer.scanEnded(vbucket)
if err != nil {
err = m.EnhanceErr(err)
if failPoint == scanFailPointCreate {
if errors.Is(err, gocbcore.ErrDocumentNotFound) {
logDebugf("Ignoring vbid %d as no documents exist for that vbucket", vbID)
if len(vbucketCh) == 0 {
return
}
logDebugf("Ignoring vbid %d as no documents exist for that vbucket", vbucket.id)
continue
}

if errors.Is(err, ErrTemporaryFailure) || errors.Is(err, gocbcore.ErrBusy) {
// Put the vbucket back into the channel to be retried later.
vbucketCh <- vbID
balancer.retryScan(vbucket)

if errors.Is(err, gocbcore.ErrBusy) {
// Busy indicates that the server is reporting too many active scans.
Expand All @@ -425,18 +435,10 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) {
}
}

if len(vbucketCh) == 0 {
return
}

continue
}

if !m.IsRangeScan() {
// We can ignore stream create errors for sampling scans.
if len(vbucketCh) == 0 {
return
}
continue
}

Expand Down Expand Up @@ -477,16 +479,8 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) {
return
}

if len(vbucketCh) == 0 {
return
}

continue
}

if len(vbucketCh) == 0 {
return
}
}
}()
}
Expand Down Expand Up @@ -717,20 +711,111 @@ func (m *rangeScanOpManager) cancelStream(ctx context.Context, spanCtx RequestSp
}
}

func (m *rangeScanOpManager) createVbucketChannel() chan uint16 {
var vbuckets []uint16
for vbucket := 0; vbucket < m.numVbuckets; vbucket++ {
vbuckets = append(vbuckets, uint16(vbucket))
func (m *rangeScanOpManager) createLoadBalancer() *rangeScanLoadBalancer {
var seed int64
if m.SamplingOptions() != nil && m.SamplingOptions().Seed != 0 {
// Using the sampling scan seed for the load balancer ensures that when concurrency is 1 the vbuckets are
// always scanned in the same order for a given seed
seed = int64(m.SamplingOptions().Seed)
} else {
seed = time.Now().UnixNano()
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(m.numVbuckets, func(i, j int) {
vbuckets[i], vbuckets[j] = vbuckets[j], vbuckets[i]
})

vbucketCh := make(chan uint16, m.numVbuckets)
for _, vbucket := range vbuckets {
vbucketCh <- vbucket
return newRangeScanLoadBalancer(m.serverToVbucketMap, seed)
}

type rangeScanLoadBalancer struct {
vbucketChannels map[int]chan uint16
servers []int
activeScansPerNode sync.Map
selectLock sync.Mutex
}

func newRangeScanLoadBalancer(serverToVbucketMap map[int][]uint16, seed int64) *rangeScanLoadBalancer {
b := &rangeScanLoadBalancer{
vbucketChannels: make(map[int]chan uint16),
activeScansPerNode: sync.Map{},
}

return vbucketCh
for server, vbuckets := range serverToVbucketMap {
b.servers = append(b.servers, server)

b.vbucketChannels[server] = make(chan uint16, len(vbuckets))

r := rand.New(rand.NewSource(seed)) // #nosec G404
r.Shuffle(len(vbuckets), func(i, j int) {
vbuckets[i], vbuckets[j] = vbuckets[j], vbuckets[i]
})

for _, vbucket := range vbuckets {
b.vbucketChannels[server] <- vbucket
}
}

return b
}

func (b *rangeScanLoadBalancer) retryScan(vbucket rangeScanVbucket) {
b.vbucketChannels[vbucket.server] <- vbucket.id
}

func (b *rangeScanLoadBalancer) scanEnded(vbucket rangeScanVbucket) {
zeroVal := uint32(0)
val, _ := b.activeScansPerNode.LoadOrStore(vbucket.server, &zeroVal)
atomic.AddUint32(val.(*uint32), ^uint32(0))
}

func (b *rangeScanLoadBalancer) scanStarting(vbucket rangeScanVbucket) {
zeroVal := uint32(0)
val, _ := b.activeScansPerNode.LoadOrStore(vbucket.server, &zeroVal)
atomic.AddUint32(val.(*uint32), uint32(1))
}

// close closes all the vbucket channels. This should only be called if no more vbucket scans will happen, i.e. selectVbucket should not be called after close.
func (b *rangeScanLoadBalancer) close() {
for _, ch := range b.vbucketChannels {
close(ch)
}
}

// selectVbucket returns the vbucket id, alongside the corresponding node index for a vbucket that is on the node with
// the smallest number of active scans. The boolean return value is false if there are no more vbuckets to scan.
func (b *rangeScanLoadBalancer) selectVbucket() (rangeScanVbucket, bool) {
b.selectLock.Lock()
defer b.selectLock.Unlock()

var selectedServer int
selected := false
min := uint32(math.MaxUint32)

for s := range b.servers {
if len(b.vbucketChannels[s]) == 0 {
continue
}
zeroVal := uint32(0)
val, _ := b.activeScansPerNode.LoadOrStore(s, &zeroVal)
activeScans := *val.(*uint32)
if activeScans < min {
min = activeScans
selectedServer = s
selected = true
}
}

if !selected {
return rangeScanVbucket{}, false
}

selectedVbucket, ok := <-b.vbucketChannels[selectedServer]
if !ok {
// This should be unreachable. selectVbucket should not be called after close.
logWarnf("Vbucket channel has been closed before the range scan has finished")
return rangeScanVbucket{}, false
}
vbucket := rangeScanVbucket{
id: selectedVbucket,
server: selectedServer,
}
b.scanStarting(vbucket)
return vbucket, true
}
Loading

0 comments on commit cd878d8

Please sign in to comment.