diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 5b6a3e07b8..b18fa8b07a 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -627,6 +627,11 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro store.WithLazyExpandedPostings(u.cfg.BucketStore.LazyExpandedPostingsEnabled), store.WithPostingGroupMaxKeySeriesRatio(u.cfg.BucketStore.LazyExpandedPostingGroupMaxKeySeriesRatio), store.WithDontResort(true), // Cortex doesn't need to resort series in store gateway. + store.WithBlockLifecycleCallback(&shardingBlockLifecycleCallbackAdapter{ + userID: userID, + strategy: u.shardingStrategy, + logger: userLogger, + }), } if u.logLevel.String() == "debug" { bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging()) diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index cd6baa146a..d5d7f09e37 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -889,6 +889,14 @@ func (u *userShardingStrategy) FilterBlocks(ctx context.Context, userID string, return nil } +func (u *userShardingStrategy) OwnBlock(userID string, _ thanos_metadata.Meta) (bool, error) { + if util.StringsContain(u.users, userID) { + return true, nil + } + + return false, nil +} + // failFirstGetBucket is an objstore.Bucket wrapper which fails the first Get() request with a mocked error. type failFirstGetBucket struct { objstore.Bucket diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 3102638192..d35f3677b4 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -1307,6 +1307,11 @@ func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string, return args.Error(0) } +func (m *mockShardingStrategy) OwnBlock(userID string, meta metadata.Meta) (bool, error) { + args := m.Called(userID, meta) + return args.Bool(0), args.Error(1) +} + func createBucketIndex(t *testing.T, bkt objstore.Bucket, userID string) *bucketindex.Index { updater := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger()) idx, _, _, err := updater.UpdateIndex(context.Background(), nil) diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index cac1b63ba0..43df0584be 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -2,6 +2,7 @@ package storegateway import ( "context" + "errors" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -19,6 +20,10 @@ const ( shardExcludedMeta = "shard-excluded" ) +var ( + errBlockNotOwned = errors.New("block not owned") +) + type ShardingStrategy interface { // FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs // that should be synced by the store-gateway. @@ -28,6 +33,9 @@ type ShardingStrategy interface { // The provided loaded map contains blocks which have been previously returned by this function and // are now loaded or loading in the store-gateway. FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error + + // OwnBlock checks if the block is owned by the current instance. + OwnBlock(userID string, meta metadata.Meta) (bool, error) } // ShardingLimits is the interface that should be implemented by the limits provider, @@ -71,6 +79,10 @@ func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[uli return nil } +func (s *NoShardingStrategy) OwnBlock(_ string, meta metadata.Meta) (bool, error) { + return true, nil +} + // DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways. // Not go-routine safe. type DefaultShardingStrategy struct { @@ -102,6 +114,17 @@ func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, meta return nil } +func (s *DefaultShardingStrategy) OwnBlock(_ string, meta metadata.Meta) (bool, error) { + key := cortex_tsdb.HashBlockID(meta.ULID) + + // Check if the block is owned by the store-gateway + set, err := s.r.Get(key, BlocksOwnerSync, nil, nil, nil) + if err != nil { + return false, err + } + return set.Includes(s.instanceAddr), nil +} + // ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, // where each tenant blocks are sharded across a subset of store-gateway instances. type ShuffleShardingStrategy struct { @@ -151,6 +174,18 @@ func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, return nil } +func (s *ShuffleShardingStrategy) OwnBlock(userID string, meta metadata.Meta) (bool, error) { + subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding) + key := cortex_tsdb.HashBlockID(meta.ULID) + + // Check if the block is owned by the store-gateway + set, err := subRing.Get(key, BlocksOwnerSync, nil, nil, nil) + if err != nil { + return false, err + } + return set.Includes(s.instanceAddr), nil +} + func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec, logger log.Logger) { bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() @@ -275,3 +310,20 @@ func (a *shardingBucketReaderAdapter) Iter(ctx context.Context, dir string, f fu return a.InstrumentedBucketReader.Iter(ctx, dir, f, options...) } + +type shardingBlockLifecycleCallbackAdapter struct { + userID string + strategy ShardingStrategy + logger log.Logger +} + +func (a *shardingBlockLifecycleCallbackAdapter) PreAdd(meta metadata.Meta) error { + own, err := a.strategy.OwnBlock(a.userID, meta) + // If unable to check if block is owned or not because of ring error, mark it as owned + // and ignore the error. + if err != nil || own { + return nil + } + level.Info(a.logger).Log("msg", "block not owned from pre check", "block", meta.ULID.String()) + return errBlockNotOwned +} diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index f2ee50fb4c..1da65028f5 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -2,6 +2,7 @@ package storegateway import ( "context" + "errors" "fmt" "strconv" "testing" @@ -11,7 +12,9 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" @@ -272,6 +275,11 @@ func TestDefaultShardingStrategy(t *testing.T) { for instanceAddr, expectedBlocks := range testData.expectedBlocks { filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger(), nil) + for _, block := range expectedBlocks { + owned, err := filter.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block}}) + require.NoError(t, err) + require.True(t, owned) + } synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) synced.WithLabelValues(shardExcludedMeta).Set(0) @@ -657,6 +665,11 @@ func TestShuffleShardingStrategy(t *testing.T) { // Assert on filter blocks. for _, expected := range testData.expectedBlocks { filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), allowedTenants, zoneStableShuffleSharding) //nolint:govet + for _, block := range expected.blocks { + owned, err := filter.OwnBlock(userID, metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block}}) + require.NoError(t, err) + require.True(t, owned) + } synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) synced.WithLabelValues(shardExcludedMeta).Set(0) @@ -693,3 +706,159 @@ type shardingLimitsMock struct { func (m *shardingLimitsMock) StoreGatewayTenantShardSize(_ string) float64 { return m.storeGatewayTenantShardSize } + +func TestDefaultShardingStrategy_OwnBlock(t *testing.T) { + t.Parallel() + // The following block IDs have been picked to have increasing hash values + // in order to simplify the tests. + block1 := ulid.MustNew(1, nil) // hash: 283204220 + block2 := ulid.MustNew(2, nil) + block1Hash := cortex_tsdb.HashBlockID(block1) + registeredAt := time.Now() + block2Hash := cortex_tsdb.HashBlockID(block2) + + ctx := context.Background() + store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Initialize the ring state. + require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) { + d := ring.NewDesc() + d.AddIngester("instance-1", "127.0.0.1", "zone-a", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-2", "127.0.0.2", "zone-b", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) + return d, true, nil + })) + + cfg := ring.Config{ + ReplicationFactor: 1, + HeartbeatTimeout: time.Minute, + ZoneAwarenessEnabled: true, + } + + r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, r)) + defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck + + // Wait until the ring client has synced. + require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) + filter := NewDefaultShardingStrategy(r, "127.0.0.1", log.NewNopLogger(), nil) + owned, err := filter.OwnBlock("", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}) + require.NoError(t, err) + require.True(t, owned) + // Owned by 127.0.0.2 + owned, err = filter.OwnBlock("", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}) + require.NoError(t, err) + require.False(t, owned) + + filter2 := NewDefaultShardingStrategy(r, "127.0.0.2", log.NewNopLogger(), nil) + owned, err = filter2.OwnBlock("", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}) + require.NoError(t, err) + require.True(t, owned) +} + +func TestShuffleShardingStrategy_OwnBlock(t *testing.T) { + t.Parallel() + // The following block IDs have been picked to have increasing hash values + // in order to simplify the tests. + block1 := ulid.MustNew(1, nil) // hash: 283204220 + block2 := ulid.MustNew(2, nil) + block1Hash := cortex_tsdb.HashBlockID(block1) + registeredAt := time.Now() + block2Hash := cortex_tsdb.HashBlockID(block2) + + ctx := context.Background() + store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Initialize the ring state. + require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) { + d := ring.NewDesc() + d.AddIngester("instance-1", "127.0.0.1", "zone-a", []uint32{block1Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-2", "127.0.0.2", "zone-b", []uint32{block2Hash + 1}, ring.ACTIVE, registeredAt) + d.AddIngester("instance-3", "127.0.0.3", "zone-c", []uint32{block2Hash + 2}, ring.ACTIVE, registeredAt) + return d, true, nil + })) + + cfg := ring.Config{ + ReplicationFactor: 1, + HeartbeatTimeout: time.Minute, + ZoneAwarenessEnabled: true, + } + limits := &shardingLimitsMock{storeGatewayTenantShardSize: 2} + + r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, r)) + defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck + + // Wait until the ring client has synced. + require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) + filter := NewShuffleShardingStrategy(r, "instance-1", "127.0.0.1", limits, log.NewNopLogger(), nil, true) + filter2 := NewShuffleShardingStrategy(r, "instance-2", "127.0.0.2", limits, log.NewNopLogger(), nil, true) + + owned, err := filter.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block1}}) + require.NoError(t, err) + require.True(t, owned) + // Owned by 127.0.0.2 + owned, err = filter.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}) + require.NoError(t, err) + require.False(t, owned) + + owned, err = filter2.OwnBlock("user-1", metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block2}}) + require.NoError(t, err) + require.True(t, owned) +} + +func TestShardingBlockLifecycleCallbackAdapter(t *testing.T) { + userID := "user-1" + logger := log.NewNopLogger() + block := ulid.MustNew(1, nil) + meta := metadata.Meta{BlockMeta: tsdb.BlockMeta{ULID: block}} + + for _, tc := range []struct { + name string + shardingStrategy func() ShardingStrategy + expectErr bool + }{ + { + name: "own block", + shardingStrategy: func() ShardingStrategy { + s := &mockShardingStrategy{} + s.On("OwnBlock", mock.Anything, mock.Anything).Return(true, nil) + return s + }, + }, + { + name: "own block has error, still own block", + shardingStrategy: func() ShardingStrategy { + s := &mockShardingStrategy{} + s.On("OwnBlock", mock.Anything, mock.Anything).Return(false, errors.New("some error")) + return s + }, + }, + { + name: "not own block", + shardingStrategy: func() ShardingStrategy { + s := &mockShardingStrategy{} + s.On("OwnBlock", mock.Anything, mock.Anything).Return(false, nil) + return s + }, + expectErr: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + a := &shardingBlockLifecycleCallbackAdapter{ + userID: userID, + logger: logger, + strategy: tc.shardingStrategy(), + } + err := a.PreAdd(meta) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +}