Skip to content

Commit

Permalink
chore: refactor dedup package
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Jul 24, 2024
1 parent e31db3d commit 8acf78e
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 83 deletions.
6 changes: 4 additions & 2 deletions processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (proc *LifecycleManager) Start() error {
proc.Handle.transformer = proc.Transformer
}

proc.Handle.Setup(
if err := proc.Handle.Setup(
proc.BackendConfig,
proc.gatewayDB,
proc.routerDB,
Expand All @@ -74,7 +74,9 @@ func (proc *LifecycleManager) Start() error {
proc.transDebugger,
proc.enrichers,
proc.trackedUsersReporter,
)
); err != nil {
return err
}

currentCtx, cancel := context.WithCancel(context.Background())
proc.currentCancel = cancel
Expand Down
9 changes: 7 additions & 2 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (proc *Handle) Setup(
transDebugger transformationdebugger.TransformationDebugger,
enrichers []enricher.PipelineEnricher,
trackedUsersReporter trackedusers.UsersReporter,
) {
) error {
proc.reporting = reporting
proc.destDebugger = destDebugger
proc.transDebugger = transDebugger
Expand Down Expand Up @@ -614,7 +614,11 @@ func (proc *Handle) Setup(
})
}
if proc.config.enableDedup {
proc.dedup = dedup.New(proc.conf, proc.statsFactory)
var err error
proc.dedup, err = dedup.New(proc.conf, proc.statsFactory)
if err != nil {
return err
}
}
proc.sourceObservers = []sourceObserver{delayed.NewEventStats(proc.statsFactory, proc.conf)}
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -643,6 +647,7 @@ func (proc *Handle) Setup(
}))

proc.crashRecover()
return nil
}

func (proc *Handle) setupReloadableVars() {
Expand Down
18 changes: 11 additions & 7 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1812,7 +1812,7 @@ var _ = Describe("Processor", Ordered, func() {
// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)

processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -1831,6 +1831,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand All @@ -1843,7 +1844,7 @@ var _ = Describe("Processor", Ordered, func() {

c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)

processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -1862,6 +1863,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand All @@ -1879,7 +1881,7 @@ var _ = Describe("Processor", Ordered, func() {

processor := prepareHandle(NewHandle(config.Default, mockTransformer))

processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -1898,6 +1900,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand Down Expand Up @@ -2982,7 +2985,7 @@ var _ = Describe("Processor", Ordered, func() {

// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)
processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -3001,7 +3004,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)

Expect(err).To(BeNil())
setMainLoopTimeout(processor, 1*time.Second)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down Expand Up @@ -3041,7 +3044,7 @@ var _ = Describe("Processor", Ordered, func() {

// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)
processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -3060,6 +3063,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
defer processor.Shutdown()

processor.config.readLoopSleep = config.SingleValueLoader(time.Millisecond)
Expand Down Expand Up @@ -5049,7 +5053,7 @@ func processorSetupAndAssertJobHandling(processor *Handle, c *testContext) {

func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool) {
setDisableDedupFeature(processor, enableDedup)
processor.Setup(
_ = processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand Down
55 changes: 53 additions & 2 deletions services/dedup/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func DefaultPath() string {
return fmt.Sprintf(`%v%v`, tmpDirPath, badgerPathName)
}

func NewBadgerDB(conf *config.Config, stats stats.Stats, path string) *BadgerDB {
func NewBadgerDB(conf *config.Config, stats stats.Stats, path string) *dedup {
dedupWindow := conf.GetReloadableDurationVar(3600, time.Second, "Dedup.dedupWindow", "Dedup.dedupWindowInS")

log := logger.NewLogger().Child("dedup")
Expand All @@ -67,7 +67,10 @@ func NewBadgerDB(conf *config.Config, stats stats.Stats, path string) *BadgerDB
window: dedupWindow,
opts: badgerOpts,
}
return db
return &dedup{
badgerDB: db,
cache: make(map[string]int64),
}
}

func (d *BadgerDB) Get(key string) (int64, bool, error) {
Expand Down Expand Up @@ -171,6 +174,54 @@ func (d *BadgerDB) gcLoop() {
}
}

type dedup struct {
badgerDB *BadgerDB
cacheMu sync.Mutex
cache map[string]int64
}

func (d *dedup) Set(kv types.KeyValue) (bool, int64, error) {
d.cacheMu.Lock()
defer d.cacheMu.Unlock()
if previous, found := d.cache[kv.Key]; found {
return false, previous, nil
}
previous, found, err := d.badgerDB.Get(kv.Key)
if err != nil {
return false, 0, err
}
if !found {
d.cache[kv.Key] = kv.Value
}
return !found, previous, nil
}

func (d *dedup) Commit(keys []string) error {
d.cacheMu.Lock()
defer d.cacheMu.Unlock()

kvs := make([]types.KeyValue, len(keys))
for i, key := range keys {
value, ok := d.cache[key]
if !ok {
return fmt.Errorf("key %v has not been previously set", key)
}
kvs[i] = types.KeyValue{Key: key, Value: value}
}

err := d.badgerDB.Set(kvs)
if err == nil {
for _, kv := range kvs {
delete(d.cache, kv.Key)
}
}
return err
}

func (d *dedup) Close() {
d.badgerDB.Close()
}

type loggerForBadger struct {
logger.Logger
}
Expand Down
77 changes: 12 additions & 65 deletions services/dedup/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@
package dedup

import (
"fmt"
"sync"

"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/services/dedup/badger"
"github.com/rudderlabs/rudder-server/services/dedup/scylla"
"github.com/rudderlabs/rudder-server/services/dedup/types"
)

type Mode string // skipcq: RVV-B0009
type Mode string

const (
Badger Mode = "Badger"
Expand All @@ -22,23 +19,21 @@ const (
)

// New creates a new deduplication service. The service needs to be closed after use.
func New(conf *config.Config, stats stats.Stats) Dedup {
func New(conf *config.Config, stats stats.Stats) (Dedup, error) {
mode := Mode(config.GetString("Dedup.Mode", string(Badger)))
switch mode {
case Badger:
return &dedup{
badgerDB: badger.NewBadgerDB(conf, stats, badger.DefaultPath()),
cache: make(map[string]int64),
}
return badger.NewBadgerDB(conf, stats, badger.DefaultPath()), nil
case Scylla:
return nil
scylla, err := scylla.New(conf, stats)
if err != nil {
return nil, err
}
return scylla, nil
case Dual:
return nil
return nil, nil

Check failure on line 34 in services/dedup/dedup.go

View workflow job for this annotation

GitHub Actions / lint

return both the `nil` error and invalid value: use a sentinel error instead (nilnil)
default:
return &dedup{
badgerDB: badger.NewBadgerDB(conf, stats, badger.DefaultPath()),
cache: make(map[string]int64),
}
return badger.NewBadgerDB(conf, stats, badger.DefaultPath()), nil
}
}

Expand All @@ -53,51 +48,3 @@ type Dedup interface {
// Close closes the deduplication service
Close()
}

type dedup struct {
badgerDB *badger.BadgerDB
cacheMu sync.Mutex
cache map[string]int64
}

func (d *dedup) Set(kv types.KeyValue) (bool, int64, error) {
d.cacheMu.Lock()
defer d.cacheMu.Unlock()
if previous, found := d.cache[kv.Key]; found {
return false, previous, nil
}
previous, found, err := d.badgerDB.Get(kv.Key)
if err != nil {
return false, 0, err
}
if !found {
d.cache[kv.Key] = kv.Value
}
return !found, previous, nil
}

func (d *dedup) Commit(keys []string) error {
d.cacheMu.Lock()
defer d.cacheMu.Unlock()

kvs := make([]types.KeyValue, len(keys))
for i, key := range keys {
value, ok := d.cache[key]
if !ok {
return fmt.Errorf("key %v has not been previously set", key)
}
kvs[i] = types.KeyValue{Key: key, Value: value}
}

err := d.badgerDB.Set(kvs)
if err == nil {
for _, kv := range kvs {
delete(d.cache, kv.Key)
}
}
return err
}

func (d *dedup) Close() {
d.badgerDB.Close()
}
14 changes: 9 additions & 5 deletions services/dedup/dedup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ func Test_Dedup(t *testing.T) {
dbPath := os.TempDir() + "/dedup_test"
defer func() { _ = os.RemoveAll(dbPath) }()
_ = os.RemoveAll(dbPath)
d := dedup.New(config.New(), stats.Default)
d, err := dedup.New(config.New(), stats.Default)
require.Nil(t, err)
defer d.Close()

t.Run("if message id is not present in cache and badger db", func(t *testing.T) {
Expand Down Expand Up @@ -77,7 +78,8 @@ func Test_Dedup_Window(t *testing.T) {
defer func() { _ = os.RemoveAll(dbPath) }()
_ = os.RemoveAll(dbPath)
config.Set("Dedup.dedupWindow", "1s")
d := dedup.New(config.New(), stats.Default)
d, err := dedup.New(config.New(), stats.Default)
require.Nil(t, err)
defer d.Close()

found, _, err := d.Set(types.KeyValue{Key: "to be deleted", Value: 1})
Expand Down Expand Up @@ -106,7 +108,8 @@ func Test_Dedup_ErrTxnTooBig(t *testing.T) {
dbPath := os.TempDir() + "/dedup_test_errtxntoobig"
defer os.RemoveAll(dbPath)
os.RemoveAll(dbPath)
d := dedup.New(config.New(), stats.Default)
d, err := dedup.New(config.New(), stats.Default)
require.Nil(t, err)
defer d.Close()

size := 105_000
Expand All @@ -115,7 +118,7 @@ func Test_Dedup_ErrTxnTooBig(t *testing.T) {
messages[i] = uuid.New().String()
_, _, _ = d.Set(types.KeyValue{Key: messages[i], Value: int64(i + 1)})
}
err := d.Commit(messages)
err = d.Commit(messages)
require.NoError(t, err)
}

Expand All @@ -128,7 +131,8 @@ func Benchmark_Dedup(b *testing.B) {
b.Logf("using path %s, since tmpDir has issues in macOS\n", dbPath)
defer func() { _ = os.RemoveAll(dbPath) }()
_ = os.MkdirAll(dbPath, 0o750)
d := dedup.New(config.New(), stats.Default)
d, err := dedup.New(config.New(), stats.Default)
require.NoError(b, err)

b.Run("no duplicates 1000 batch unique", func(b *testing.B) {
batchSize := 1000
Expand Down
Loading

0 comments on commit 8acf78e

Please sign in to comment.