Skip to content

Commit

Permalink
Store Gateway: Add pre add block ownership check (#6483)
Browse files Browse the repository at this point in the history
* pre add block hook

Signed-off-by: Ben Ye <[email protected]>

* fix go sum

Signed-off-by: Ben Ye <[email protected]>

* fix lint

Signed-off-by: Ben Ye <[email protected]>

---------

Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
yeya24 authored Jan 6, 2025
1 parent b3a7a55 commit 12412e6
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,11 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
store.WithLazyExpandedPostings(u.cfg.BucketStore.LazyExpandedPostingsEnabled),
store.WithPostingGroupMaxKeySeriesRatio(u.cfg.BucketStore.LazyExpandedPostingGroupMaxKeySeriesRatio),
store.WithDontResort(true), // Cortex doesn't need to resort series in store gateway.
store.WithBlockLifecycleCallback(&shardingBlockLifecycleCallbackAdapter{
userID: userID,
strategy: u.shardingStrategy,
logger: userLogger,
}),
}
if u.logLevel.String() == "debug" {
bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
Expand Down
8 changes: 8 additions & 0 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,14 @@ func (u *userShardingStrategy) FilterBlocks(ctx context.Context, userID string,
return nil
}

func (u *userShardingStrategy) OwnBlock(userID string, _ thanos_metadata.Meta) (bool, error) {
if util.StringsContain(u.users, userID) {
return true, nil
}

return false, nil
}

// failFirstGetBucket is an objstore.Bucket wrapper which fails the first Get() request with a mocked error.
type failFirstGetBucket struct {
objstore.Bucket
Expand Down
5 changes: 5 additions & 0 deletions pkg/storegateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,11 @@ func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string,
return args.Error(0)
}

func (m *mockShardingStrategy) OwnBlock(userID string, meta metadata.Meta) (bool, error) {
args := m.Called(userID, meta)
return args.Bool(0), args.Error(1)
}

func createBucketIndex(t *testing.T, bkt objstore.Bucket, userID string) *bucketindex.Index {
updater := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger())
idx, _, _, err := updater.UpdateIndex(context.Background(), nil)
Expand Down
52 changes: 52 additions & 0 deletions pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storegateway

import (
"context"
"errors"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -19,6 +20,10 @@ const (
shardExcludedMeta = "shard-excluded"
)

var (
errBlockNotOwned = errors.New("block not owned")
)

type ShardingStrategy interface {
// FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs
// that should be synced by the store-gateway.
Expand All @@ -28,6 +33,9 @@ type ShardingStrategy interface {
// The provided loaded map contains blocks which have been previously returned by this function and
// are now loaded or loading in the store-gateway.
FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error

// OwnBlock checks if the block is owned by the current instance.
OwnBlock(userID string, meta metadata.Meta) (bool, error)
}

// ShardingLimits is the interface that should be implemented by the limits provider,
Expand Down Expand Up @@ -71,6 +79,10 @@ func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[uli
return nil
}

func (s *NoShardingStrategy) OwnBlock(_ string, meta metadata.Meta) (bool, error) {
return true, nil
}

// DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways.
// Not go-routine safe.
type DefaultShardingStrategy struct {
Expand Down Expand Up @@ -102,6 +114,17 @@ func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, meta
return nil
}

func (s *DefaultShardingStrategy) OwnBlock(_ string, meta metadata.Meta) (bool, error) {
key := cortex_tsdb.HashBlockID(meta.ULID)

// Check if the block is owned by the store-gateway
set, err := s.r.Get(key, BlocksOwnerSync, nil, nil, nil)
if err != nil {
return false, err
}
return set.Includes(s.instanceAddr), nil
}

// ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways,
// where each tenant blocks are sharded across a subset of store-gateway instances.
type ShuffleShardingStrategy struct {
Expand Down Expand Up @@ -151,6 +174,18 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string,
return nil
}

func (s *ShuffleShardingStrategy) OwnBlock(userID string, meta metadata.Meta) (bool, error) {
subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding)
key := cortex_tsdb.HashBlockID(meta.ULID)

// Check if the block is owned by the store-gateway
set, err := subRing.Get(key, BlocksOwnerSync, nil, nil, nil)
if err != nil {
return false, err
}
return set.Includes(s.instanceAddr), nil
}

func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec, logger log.Logger) {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()

Expand Down Expand Up @@ -275,3 +310,20 @@ func (a *shardingBucketReaderAdapter) Iter(ctx context.Context, dir string, f fu

return a.InstrumentedBucketReader.Iter(ctx, dir, f, options...)
}

type shardingBlockLifecycleCallbackAdapter struct {
userID string
strategy ShardingStrategy
logger log.Logger
}

func (a *shardingBlockLifecycleCallbackAdapter) PreAdd(meta metadata.Meta) error {
own, err := a.strategy.OwnBlock(a.userID, meta)
// If unable to check if block is owned or not because of ring error, mark it as owned
// and ignore the error.
if err != nil || own {
return nil
}
level.Info(a.logger).Log("msg", "block not owned from pre check", "block", meta.ULID.String())
return errBlockNotOwned
}
169 changes: 169 additions & 0 deletions pkg/storegateway/sharding_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storegateway

import (
"context"
"errors"
"fmt"
"strconv"
"testing"
Expand All @@ -11,7 +12,9 @@ import (
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -272,6 +275,11 @@ func TestDefaultShardingStrategy(t *testing.T) {

for instanceAddr, expectedBlocks := range testData.expectedBlocks {
filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger(), nil)
for _, block := range expectedBlocks {
owned, err := filter.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block}})
require.NoError(t, err)
require.True(t, owned)
}
synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"})
synced.WithLabelValues(shardExcludedMeta).Set(0)

Expand Down Expand Up @@ -657,6 +665,11 @@ func TestShuffleShardingStrategy(t *testing.T) {
// Assert on filter blocks.
for _, expected := range testData.expectedBlocks {
filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), allowedTenants, zoneStableShuffleSharding) //nolint:govet
for _, block := range expected.blocks {
owned, err := filter.OwnBlock(userID, metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block}})
require.NoError(t, err)
require.True(t, owned)
}
synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"})
synced.WithLabelValues(shardExcludedMeta).Set(0)

Expand Down Expand Up @@ -693,3 +706,159 @@ type shardingLimitsMock struct {
func (m *shardingLimitsMock) StoreGatewayTenantShardSize(_ string) float64 {
return m.storeGatewayTenantShardSize
}

func TestDefaultShardingStrategy_OwnBlock(t *testing.T) {
t.Parallel()
// The following block IDs have been picked to have increasing hash values
// in order to simplify the tests.
block1 := ulid.MustNew(1, nil) // hash: 283204220
block2 := ulid.MustNew(2, nil)
block1Hash := cortex_tsdb.HashBlockID(block1)
registeredAt := time.Now()
block2Hash := cortex_tsdb.HashBlockID(block2)

ctx := context.Background()
store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

// Initialize the ring state.
require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) {
d := ring.NewDesc()
d.AddIngester("instance-1", "127.0.0.1", "zone-a", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
d.AddIngester("instance-2", "127.0.0.2", "zone-b", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
return d, true, nil
}))

cfg := ring.Config{
ReplicationFactor: 1,
HeartbeatTimeout: time.Minute,
ZoneAwarenessEnabled: true,
}

r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck

// Wait until the ring client has synced.
require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE))
filter := NewDefaultShardingStrategy(r, "127.0.0.1", log.NewNopLogger(), nil)
owned, err := filter.OwnBlock("", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}})
require.NoError(t, err)
require.True(t, owned)
// Owned by 127.0.0.2
owned, err = filter.OwnBlock("", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}})
require.NoError(t, err)
require.False(t, owned)

filter2 := NewDefaultShardingStrategy(r, "127.0.0.2", log.NewNopLogger(), nil)
owned, err = filter2.OwnBlock("", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}})
require.NoError(t, err)
require.True(t, owned)
}

func TestShuffleShardingStrategy_OwnBlock(t *testing.T) {
t.Parallel()
// The following block IDs have been picked to have increasing hash values
// in order to simplify the tests.
block1 := ulid.MustNew(1, nil) // hash: 283204220
block2 := ulid.MustNew(2, nil)
block1Hash := cortex_tsdb.HashBlockID(block1)
registeredAt := time.Now()
block2Hash := cortex_tsdb.HashBlockID(block2)

ctx := context.Background()
store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

// Initialize the ring state.
require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) {
d := ring.NewDesc()
d.AddIngester("instance-1", "127.0.0.1", "zone-a", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt)
d.AddIngester("instance-2", "127.0.0.2", "zone-b", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt)
d.AddIngester("instance-3", "127.0.0.3", "zone-c", []uint32{block2Hash + 2}, ring.ACTIVE, registeredAt)
return d, true, nil
}))

cfg := ring.Config{
ReplicationFactor: 1,
HeartbeatTimeout: time.Minute,
ZoneAwarenessEnabled: true,
}
limits := &shardingLimitsMock{storeGatewayTenantShardSize: 2}

r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck

// Wait until the ring client has synced.
require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE))
filter := NewShuffleShardingStrategy(r, "instance-1", "127.0.0.1", limits, log.NewNopLogger(), nil, true)
filter2 := NewShuffleShardingStrategy(r, "instance-2", "127.0.0.2", limits, log.NewNopLogger(), nil, true)

owned, err := filter.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}})
require.NoError(t, err)
require.True(t, owned)
// Owned by 127.0.0.2
owned, err = filter.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}})
require.NoError(t, err)
require.False(t, owned)

owned, err = filter2.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}})
require.NoError(t, err)
require.True(t, owned)
}

func TestShardingBlockLifecycleCallbackAdapter(t *testing.T) {
userID := "user-1"
logger := log.NewNopLogger()
block := ulid.MustNew(1, nil)
meta := metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block}}

for _, tc := range []struct {
name string
shardingStrategy func() ShardingStrategy
expectErr bool
}{
{
name: "own block",
shardingStrategy: func() ShardingStrategy {
s := &mockShardingStrategy{}
s.On("OwnBlock", mock.Anything, mock.Anything).Return(true, nil)
return s
},
},
{
name: "own block has error, still own block",
shardingStrategy: func() ShardingStrategy {
s := &mockShardingStrategy{}
s.On("OwnBlock", mock.Anything, mock.Anything).Return(false, errors.New("some error"))
return s
},
},
{
name: "not own block",
shardingStrategy: func() ShardingStrategy {
s := &mockShardingStrategy{}
s.On("OwnBlock", mock.Anything, mock.Anything).Return(false, nil)
return s
},
expectErr: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
a := &shardingBlockLifecycleCallbackAdapter{
userID: userID,
logger: logger,
strategy: tc.shardingStrategy(),
}
err := a.PreAdd(meta)
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

0 comments on commit 12412e6

Please sign in to comment.