-
Notifications
You must be signed in to change notification settings - Fork 320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introduce scylla dedup #4922
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe recent changes enhance the deduplication service across multiple components by modifying method signatures for better error handling and data management. Key functions now accept structured data types, such as maps instead of slices, allowing for more efficient key-value pair handling. Additionally, new structs and methods were introduced to improve the integration between different database services, promoting a robust architecture for maintaining data consistency. Changes
Sequence Diagram(s)sequenceDiagram
participant C as Client
participant S as Service
participant DB as Database
C->>S: Call Commit(keys)
S->>DB: Store key-value pairs
DB-->>S: Acknowledge success
S-->>C: Return success
sequenceDiagram
participant C as Client
participant S as Service
participant M as Manager
C->>M: Start process
M->>S: Call Setup()
alt Success
S-->>M: Setup successful
M-->>C: Process started
else Failure
S-->>M: Error
M-->>C: Return error
end
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configuration File (
|
229a938
to
7bb2dd9
Compare
efb7c8c
to
35a0edf
Compare
21ba90a
to
e31686d
Compare
8acf78e
to
d15f936
Compare
d15f936
to
8e393ae
Compare
31cbcff
to
a2f3d27
Compare
ff5fd37
to
0a698ae
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #4922 +/- ##
==========================================
+ Coverage 74.37% 74.40% +0.02%
==========================================
Files 428 431 +3
Lines 49909 50053 +144
==========================================
+ Hits 37119 37241 +122
- Misses 10340 10352 +12
- Partials 2450 2460 +10 ☔ View full report in Codecov by Sentry. |
1a20fb3
to
2a083d2
Compare
2a083d2
to
69f170e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not very sure on mirror scylla and mirror badger implementations.
Co-authored-by: Mihir Gandhi <[email protected]>
0c3f0d7
to
4edc80d
Compare
} | ||
|
||
func (mb *MirrorBadger) Get(kv types.KeyValue) (bool, int64, error) { | ||
_, _, _ = mb.scylla.Get(kv) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we handle the error here? In worst case scylla will get populated and badger will not, anyways we are throwing a panic in processor in such cases so everything will be retried.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be OK given that scylla with this configuration is not essential. Still, we want to know if mirroring isn't working, right? We could start with logging these. Wdyt?
services/dedup/types/types.go
Outdated
Value int64 | ||
Key string | ||
Value int64 | ||
WorkspaceId string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WorkspaceId string | |
WorkspaceID string |
services/dedup/scylla/scylla_test.go
Outdated
require.Nil(t, err) | ||
require.False(t, found) | ||
}) | ||
t.Run("Same messageID should be deduped for same workspace from cache", func(t *testing.T) { | ||
key1 := types.KeyValue{Key: "b", Value: 1, WorkspaceId: "test"} | ||
key2 := types.KeyValue{Key: "b", Value: 1, WorkspaceId: "test"} | ||
found, _, err := scylla.Get(key1) | ||
require.Nil(t, err) | ||
require.True(t, found) | ||
found, _, err = scylla.Get(key2) | ||
require.Nil(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit-pick] for consistency and improved semantics
require.Nil(t, err) | |
require.False(t, found) | |
}) | |
t.Run("Same messageID should be deduped for same workspace from cache", func(t *testing.T) { | |
key1 := types.KeyValue{Key: "b", Value: 1, WorkspaceId: "test"} | |
key2 := types.KeyValue{Key: "b", Value: 1, WorkspaceId: "test"} | |
found, _, err := scylla.Get(key1) | |
require.Nil(t, err) | |
require.True(t, found) | |
found, _, err = scylla.Get(key2) | |
require.Nil(t, err) | |
require.NoError(t, err) | |
require.False(t, found) | |
}) | |
t.Run("Same messageID should be deduped for same workspace from cache", func(t *testing.T) { | |
key1 := types.KeyValue{Key: "b", Value: 1, WorkspaceId: "test"} | |
key2 := types.KeyValue{Key: "b", Value: 1, WorkspaceId: "test"} | |
found, _, err := scylla.Get(key1) | |
require.NoError(t, err) | |
require.True(t, found) | |
found, _, err = scylla.Get(key2) | |
require.NoError(t, err) |
}) | ||
} | ||
if err := d.scylla.ExecuteBatch(scyllaBatch); err != nil { | ||
return fmt.Errorf("error committing keys: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[minor]
return fmt.Errorf("error committing keys: %v", err) | |
return fmt.Errorf("committing keys: %v", err) |
services/dedup/scylla/scylla.go
Outdated
d.createTableMap[kv.WorkspaceId] = &sync.Once{} | ||
once = d.createTableMap[kv.WorkspaceId] | ||
} | ||
d.createTableMu.Unlock() | ||
once.Do(func() { | ||
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (id text PRIMARY KEY,size bigint) WITH bloom_filter_fp_chance = 0.005;", d.keyspace, kv.WorkspaceId) | ||
err = d.scylla.Query(query).Exec() | ||
}) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use sync.OnceValue to store and reuse the error.
In the existing code, in case of an error, only the first goroutine will return the error.
d.createTableMap[kv.WorkspaceId] = &sync.Once{} | |
once = d.createTableMap[kv.WorkspaceId] | |
} | |
d.createTableMu.Unlock() | |
once.Do(func() { | |
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (id text PRIMARY KEY,size bigint) WITH bloom_filter_fp_chance = 0.005;", d.keyspace, kv.WorkspaceId) | |
err = d.scylla.Query(query).Exec() | |
}) | |
if err != nil { | |
d.createTableMap[kv.WorkspaceId] = &sync.OnceValue[error]{} | |
once = d.createTableMap[kv.WorkspaceId] | |
} | |
d.createTableMu.Unlock() | |
err := once.Do(func() error { | |
query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s.%s (id text PRIMARY KEY,size bigint) WITH bloom_filter_fp_chance = 0.005;", d.keyspace, kv.WorkspaceId) | |
return d.scylla.Query(query).Exec() | |
}) | |
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, you might want to move this logic to a separate unexported method, and call it even from Commit
. It depends on how robust you want to make your API, normally Get for a workspace should always happen before a commit.
log := logger.NewLogger().Child("dedup") | ||
func NewBadgerDB(conf *config.Config, stats stats.Stats, path string) *Dedup { | ||
dedupWindow := conf.GetReloadableDurationVar(3600, time.Second, "Dedup.dedupWindow", "Dedup.dedupWindowInS") | ||
log := logger.NewLogger().Child("Dedup") | ||
badgerOpts := badger. | ||
DefaultOptions(path). | ||
WithCompression(options.None). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Out of curiosity... did you test this with compression at all? If yes, what did you observe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an already existing code and I didn't make any changes in the existing functionality
log := logger.NewLogger().Child("dedup") | ||
func NewBadgerDB(conf *config.Config, stats stats.Stats, path string) *Dedup { | ||
dedupWindow := conf.GetReloadableDurationVar(3600, time.Second, "Dedup.dedupWindow", "Dedup.dedupWindowInS") | ||
log := logger.NewLogger().Child("Dedup") | ||
badgerOpts := badger. | ||
DefaultOptions(path). | ||
WithCompression(options.None). | ||
WithIndexCacheSize(16 << 20). // 16mb | ||
WithNumGoroutines(1). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question] Could increasing this number help?
return 0, false, err | ||
} | ||
err = d.badgerDB.View(func(txn *badger.Txn) error { | ||
err := d.badgerDB.View(func(txn *badger.Txn) error { | ||
item, err := txn.Get([]byte(key)) | ||
if err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[question on line 83] Probably not related to this PR but it seems weird so I'll ask.
Why do we do this?
payloadSize, _ = strconv.ParseInt(string(itemValue), 10, 64)
found = true
Wouldn't it make more sense to do this instead?
payloadSize, err = strconv.ParseInt(string(itemValue), 10, 64)
if err == nil {
found = true
}
I'm saying this, because if we can get the value from the database but it cannot be parsed then we wrote something invalid. We might as well say we couldn't find it? 🤔 WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was done because we care more whether the key was present or not and not the value for the key
logger.Reset() | ||
misc.Init() | ||
|
||
dbPath := os.TempDir() + "/dedup_test" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dbPath := os.TempDir() + "/dedup_test" | |
dbPath := t.TempDir() + "/dedup_test" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It checks for existence as well so you could remove the _ = os.RemoveAll(dbPath)
that you have on line 27.
It should also do the cleanup for you if I'm not mistaken:
c.tempDir, c.tempDirErr = os.MkdirTemp("", pattern)
if c.tempDirErr == nil {
c.Cleanup(func() {
if err := removeAll(c.tempDir); err != nil {
c.Errorf("TempDir RemoveAll cleanup: %v", err)
}
})
}
dbPath := os.TempDir() + "/dedup_test" | ||
defer func() { _ = os.RemoveAll(dbPath) }() | ||
_ = os.RemoveAll(dbPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above. Could use t.TempDir()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved but I left a few questions and a suggestion for your tests 👍
Description
Linear Ticket
Fixes PIPE-1354
Security