Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce scylla dedup #4922

Merged
merged 29 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c8f3c55
chore: refactor dedup package
cisse21 Jul 22, 2024
5348742
Merge branch 'master' into chore.refactorDedup
cisse21 Jul 23, 2024
847391f
Merge branch 'master' into chore.refactorDedup
cisse21 Jul 23, 2024
b76ea10
chore: merge master
cisse21 Jul 23, 2024
20d740d
feat: introduce scylla dedup
cisse21 Jul 23, 2024
efb7c8c
chore: review comments
cisse21 Jul 23, 2024
7bb2dd9
Merge branch 'chore.refactorDedup' into feat.introduceScyllaDedup
cisse21 Jul 23, 2024
35a0edf
chore: review comments
cisse21 Jul 23, 2024
e31db3d
Merge branch 'chore.refactorDedup' into feat.introduceScyllaDedup
cisse21 Jul 24, 2024
8e393ae
chore: refactor dedup package
cisse21 Jul 24, 2024
a2f3d27
chore: merge master
cisse21 Jul 25, 2024
e33dcca
Merge branch 'master' into feat.introduceScyllaDedup
cisse21 Jul 26, 2024
64fdfd5
chore: refactor dedup
cisse21 Jul 26, 2024
fae1a62
Merge branch 'master' into feat.introduceScyllaDedup
cisse21 Jul 29, 2024
0dfb323
chore: review comments
cisse21 Jul 29, 2024
5d2ee38
Merge branch 'master' into feat.introduceScyllaDedup
cisse21 Jul 29, 2024
6f24adb
chore: unit tests
cisse21 Jul 30, 2024
5f6197d
Merge branch 'master' into feat.introduceScyllaDedup
cisse21 Aug 2, 2024
d1470a7
chore: debug test
cisse21 Aug 2, 2024
bceb36f
Merge branch 'feat.introduceScyllaDedup' of github.com:rudderlabs/rud…
cisse21 Aug 2, 2024
41015c6
chore: run on ubuntu latest
cisse21 Aug 5, 2024
8a1aca7
chore: disable ipv6
cisse21 Aug 5, 2024
e5327dc
Merge branch 'master' into feat.introduceScyllaDedup
cisse21 Aug 5, 2024
1933c56
chore: add more tests
cisse21 Aug 5, 2024
69f170e
chore: review comments
cisse21 Aug 8, 2024
4edc80d
Apply suggestions from code review
cisse21 Aug 9, 2024
d44779f
chore: add more tests
cisse21 Aug 12, 2024
7fc3426
Merge branch 'master' into feat.introduceScyllaDedup
cisse21 Aug 13, 2024
1eba0cb
chore: review comments
cisse21 Aug 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 5 additions & 21 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ concurrency:
jobs:
integration:
name: Integration
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
strategy:
matrix:
FEATURES: [ oss ,enterprise ]
steps:
- name: Disable IPv6 (temporary fix)
run: |
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
- name: Checkout
uses: actions/checkout@v4
- uses: actions/setup-go@v5
Expand All @@ -42,7 +38,7 @@ jobs:
RSERVER_ENABLE_MULTITENANCY: false
warehouse-integration:
name: Warehouse Integration
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
Expand All @@ -67,10 +63,6 @@ jobs:
- package: warehouse/integrations/snowflake
destination: snowflake
steps:
- name: Disable IPv6 (temporary fix)
run: |
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
- name: Checkout
uses: actions/checkout@v4
- name: Setup Go
Expand Down Expand Up @@ -107,12 +99,8 @@ jobs:
path: coverage.txt
unit:
name: Unit
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
steps:
- name: Disable IPv6 (temporary fix)
run: |
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
Expand All @@ -127,7 +115,7 @@ jobs:
path: coverage.txt
package-unit:
name: Package Unit
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
Expand All @@ -154,10 +142,6 @@ jobs:
exclude: services/rsources

steps:
- name: Disable IPv6 (temporary fix)
run: |
sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
Expand Down Expand Up @@ -186,7 +170,7 @@ jobs:
path: coverage.txt
coverage:
name: Coverage
runs-on: 'ubuntu-20.04'
runs-on: ubuntu-latest
needs:
- warehouse-integration
- unit
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ replace (
github.com/cyphar/filepath-securejoin => github.com/cyphar/filepath-securejoin v0.2.5
github.com/gin-gonic/gin => github.com/gin-gonic/gin v1.10.0
github.com/go-jose/go-jose/v3 => github.com/go-jose/go-jose/v3 v3.0.3
github.com/gocql/gocql => github.com/scylladb/gocql v1.14.2
github.com/opencontainers/runc => github.com/opencontainers/runc v1.1.12
github.com/satori/go.uuid => github.com/satori/go.uuid v1.2.0
github.com/xitongsys/parquet-go => github.com/rudderlabs/parquet-go v0.0.2
Expand Down Expand Up @@ -45,6 +46,7 @@ require (
github.com/go-chi/chi/v5 v5.1.0
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-redis/redis/v8 v8.11.5
github.com/gocql/gocql v1.14.2
github.com/golang-migrate/migrate/v4 v4.17.1
github.com/golang/mock v1.6.0
github.com/gomodule/redigo v1.9.2
Expand Down Expand Up @@ -222,6 +224,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.5 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,12 +321,16 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bitfield/gotestdox v0.2.2 h1:x6RcPAbBbErKLnapz1QeAlf3ospg8efBsedU93CDsnE=
github.com/bitfield/gotestdox v0.2.2/go.mod h1:D+gwtS0urjBrzguAkTM2wodsTQYFHdpx8eqRJ3N+9pY=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bitly/go-simplejson v0.5.1 h1:xgwPbetQScXt1gh9BmoJ6j9JMr3TElvuIyjR8pgdoow=
github.com/bitly/go-simplejson v0.5.1/go.mod h1:YOPVLzCfwK14b4Sff3oP1AmGhI9T9Vsg84etUnlyp+Q=
github.com/bits-and-blooms/bitset v1.13.0 h1:bAQ9OPNFYbGHV6Nez0tmNI0RiEu7/hxlYJRUA0wFAVE=
github.com/bits-and-blooms/bitset v1.13.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk=
github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/bobg/gcsobj v0.1.2/go.mod h1:vS49EQ1A1Ib8FgrL58C8xXYZyOCR2TgzAdopy6/ipa8=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/bsm/ginkgo/v2 v2.7.0/go.mod h1:AiKlXPm7ItEHNc/2+OkrNG4E0ITzojb9/xWzvQ9XZ9w=
Expand Down Expand Up @@ -755,6 +759,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0 h1:CWyXh/jylQWp2dtiV33mY4iSSp6
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0/go.mod h1:nCLIt0w3Ept2NwF8ThLmrppXsfT07oC8k0XNDxd8sVU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 h1:NEoabXt33PDWK4fXryK4e+XX+fSKDmmu9vg3yb9YI2M=
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9/go.mod h1:fQVdB2mFZBhPW1D5Abej41LMvrErARGrrdjOnKbm5yw=
github.com/hanwen/go-fuse v1.0.0/go.mod h1:unqXarDXqzAk0rt98O2tVndEPIpUgLD9+rwFisZH3Ok=
Expand Down Expand Up @@ -1147,6 +1153,8 @@ github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWR
github.com/samber/lo v1.46.0 h1:w8G+oaCPgz1PoCJztqymCFaKwXt+5cCXn51uPxExFfQ=
github.com/samber/lo v1.46.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/scylladb/gocql v1.14.2 h1:IBPtfJFcRDzifCjXYMtrZ14oQ7OqpqQjwITQCwtGZsc=
github.com/scylladb/gocql v1.14.2/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
github.com/secure-systems-lab/go-securesystemslib v0.4.0 h1:b23VGrQhTA8cN2CbBw7/FulN9fTtqYUdS5+Oxzt+DUE=
github.com/secure-systems-lab/go-securesystemslib v0.4.0/go.mod h1:FGBZgq2tXWICsxWQW1msNf49F0Pf2Op5Htayx335Qbs=
Expand Down
12 changes: 6 additions & 6 deletions mocks/services/dedup/mock_dedup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
proc.Handle.transformer = proc.Transformer
}

proc.Handle.Setup(
if err := proc.Handle.Setup(
proc.BackendConfig,
proc.gatewayDB,
proc.routerDB,
Expand All @@ -74,7 +74,9 @@
proc.transDebugger,
proc.enrichers,
proc.trackedUsersReporter,
)
); err != nil {
return err

Check warning on line 78 in processor/manager.go

View check run for this annotation

Codecov / codecov/patch

processor/manager.go#L78

Added line #L78 was not covered by tests
}

currentCtx, cancel := context.WithCancel(context.Background())
proc.currentCancel = cancel
Expand Down
11 changes: 8 additions & 3 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@
transDebugger transformationdebugger.TransformationDebugger,
enrichers []enricher.PipelineEnricher,
trackedUsersReporter trackedusers.UsersReporter,
) {
) error {
proc.reporting = reporting
proc.destDebugger = destDebugger
proc.transDebugger = transDebugger
Expand Down Expand Up @@ -614,7 +614,11 @@
})
}
if proc.config.enableDedup {
proc.dedup = dedup.New()
var err error
proc.dedup, err = dedup.New(proc.conf, proc.statsFactory)
if err != nil {
return err

Check warning on line 620 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L620

Added line #L620 was not covered by tests
}
}
proc.sourceObservers = []sourceObserver{delayed.NewEventStats(proc.statsFactory, proc.conf)}
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -643,6 +647,7 @@
}))

proc.crashRecover()
return nil
}

func (proc *Handle) setupReloadableVars() {
Expand Down Expand Up @@ -1708,7 +1713,7 @@
p := payloadFunc()
messageSize := int64(len(p))
dedupKey := fmt.Sprintf("%v%v", messageId, eventParams.SourceJobRunId)
ok, previousSize, err := proc.dedup.Set(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize})
ok, previousSize, err := proc.dedup.Get(dedupTypes.KeyValue{Key: dedupKey, Value: messageSize, WorkspaceId: batchEvent.WorkspaceId})
if err != nil {
panic(err)
}
Expand Down
29 changes: 16 additions & 13 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@ import (
"testing"
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"github.com/google/uuid"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/format"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
Expand All @@ -32,6 +29,7 @@ import (

"github.com/rudderlabs/rudder-server/admin"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/enterprise/trackedusers"
"github.com/rudderlabs/rudder-server/internal/enricher"
"github.com/rudderlabs/rudder-server/jobsdb"
mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config"
Expand Down Expand Up @@ -78,7 +76,7 @@ type mockTrackedUsersReporter struct {
}
}

func (m *mockTrackedUsersReporter) ReportUsers(ctx context.Context, reports []*trackedusers.UsersReport, tx *Tx) error {
func (m *mockTrackedUsersReporter) ReportUsers(_ context.Context, reports []*trackedusers.UsersReport, _ *Tx) error {
m.reportCalls = append(m.reportCalls, struct {
reportedReports []*trackedusers.UsersReport
}{reportedReports: reports})
Expand Down Expand Up @@ -1874,7 +1872,7 @@ var _ = Describe("Processor", Ordered, func() {
// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)

processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -1893,6 +1891,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand All @@ -1905,7 +1904,7 @@ var _ = Describe("Processor", Ordered, func() {

c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)

processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -1924,6 +1923,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand All @@ -1941,7 +1941,7 @@ var _ = Describe("Processor", Ordered, func() {

processor := prepareHandle(NewHandle(config.Default, mockTransformer))

processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -1960,6 +1960,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
Expand Down Expand Up @@ -2533,7 +2534,7 @@ var _ = Describe("Processor", Ordered, func() {
mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl)

callUnprocessed := c.mockGatewayJobsDB.EXPECT().GetUnprocessed(gomock.Any(), gomock.Any()).Return(jobsdb.JobsResult{Jobs: unprocessedJobsList}, nil).Times(1)
c.MockDedup.EXPECT().Set(gomock.Any()).Return(true, int64(0), nil).After(callUnprocessed).Times(3)
c.MockDedup.EXPECT().Get(gomock.Any()).Return(true, int64(0), nil).After(callUnprocessed).Times(3)
c.MockDedup.EXPECT().Commit(gomock.Any()).Times(1)

// We expect one transform call to destination A, after callUnprocessed.
Expand Down Expand Up @@ -3044,7 +3045,7 @@ var _ = Describe("Processor", Ordered, func() {

// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)
processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -3063,7 +3064,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)

Expect(err).To(BeNil())
setMainLoopTimeout(processor, 1*time.Second)

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
Expand Down Expand Up @@ -3103,7 +3104,7 @@ var _ = Describe("Processor", Ordered, func() {

// crash recover returns empty list
c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1)
processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -3122,6 +3123,7 @@ var _ = Describe("Processor", Ordered, func() {
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
defer processor.Shutdown()

processor.config.readLoopSleep = config.SingleValueLoader(time.Millisecond)
Expand Down Expand Up @@ -5111,7 +5113,7 @@ func processorSetupAndAssertJobHandling(processor *Handle, c *testContext) {

func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool) {
setDisableDedupFeature(processor, enableDedup)
processor.Setup(
err := processor.Setup(
c.mockBackendConfig,
c.mockGatewayJobsDB,
c.mockRouterJobsDB,
Expand All @@ -5130,6 +5132,7 @@ func Setup(processor *Handle, c *testContext, enableDedup, enableReporting bool)
[]enricher.PipelineEnricher{},
trackedusers.NewNoopDataCollector(),
)
Expect(err).To(BeNil())
processor.reportingEnabled = enableReporting
processor.sourceObservers = []sourceObserver{c.MockObserver}
}
Expand Down
Loading
Loading