Skip to content

Commit

Permalink
chore: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Aug 8, 2024
1 parent 1933c56 commit 69f170e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
36 changes: 24 additions & 12 deletions services/dedup/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,48 +170,60 @@ type Dedup struct {
}

func (d *Dedup) Get(kv types.KeyValue) (bool, int64, error) {
d.cacheMu.Lock()
defer d.cacheMu.Unlock()
err := d.badgerDB.init()
if err != nil {
return false, 0, err

Check warning on line 175 in services/dedup/badger/badger.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/badger/badger.go#L175

Added line #L175 was not covered by tests
}
if previous, found := d.cache[kv.Key]; found {

d.cacheMu.Lock()
previous, found := d.cache[kv.Key]
d.cacheMu.Unlock()
if found {
return false, previous, nil
}
previous, found, err := d.badgerDB.Get(kv.Key)

previous, found, err = d.badgerDB.Get(kv.Key)
if err != nil {
return false, 0, err

Check warning on line 187 in services/dedup/badger/badger.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/badger/badger.go#L187

Added line #L187 was not covered by tests
}
if !found {

d.cacheMu.Lock()
defer d.cacheMu.Unlock()
if !found { // still not in the cache, but it's in the DB so let's refresh the cache
d.cache[kv.Key] = kv.Value
}

return !found, previous, nil
}

func (d *Dedup) Commit(keys []string) error {
d.cacheMu.Lock()
defer d.cacheMu.Unlock()
err := d.badgerDB.init()
if err != nil {
return err

Check warning on line 202 in services/dedup/badger/badger.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/badger/badger.go#L202

Added line #L202 was not covered by tests
}
kvs := make([]types.KeyValue, len(keys))
d.cacheMu.Lock()
for i, key := range keys {
value, ok := d.cache[key]
if !ok {
d.cacheMu.Unlock()
return fmt.Errorf("key %v has not been previously set", key)
}
kvs[i] = types.KeyValue{Key: key, Value: value}
}
d.cacheMu.Unlock()

err = d.badgerDB.Set(kvs)
if err == nil {
for _, kv := range kvs {
delete(d.cache, kv.Key)
}
if err != nil {
return err

Check warning on line 218 in services/dedup/badger/badger.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/badger/badger.go#L218

Added line #L218 was not covered by tests
}
return err

d.cacheMu.Lock()
defer d.cacheMu.Unlock()
for _, kv := range kvs {
delete(d.cache, kv.Key)
}
return nil
}

func (d *Dedup) Close() {
Expand Down
8 changes: 6 additions & 2 deletions services/dedup/scylla/scylla.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scylla

import (
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (d *ScyllaDB) Get(kv types.KeyValue) (bool, int64, error) {
// Check if the key exists in the DB
var value int64
err = d.scylla.Query(fmt.Sprintf("SELECT size FROM %s.%s WHERE id = ?", d.keyspace, kv.WorkspaceId), kv.Key).Scan(&value)
if err != nil && err != gocql.ErrNotFound {
if err != nil && !errors.Is(err, gocql.ErrNotFound) {
return false, 0, fmt.Errorf("error getting key %s: %v", kv.Key, err)

Check warning on line 68 in services/dedup/scylla/scylla.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/scylla/scylla.go#L68

Added line #L68 was not covered by tests
}
exists := !(err == gocql.ErrNotFound)
Expand All @@ -75,15 +76,16 @@ func (d *ScyllaDB) Get(kv types.KeyValue) (bool, int64, error) {

func (d *ScyllaDB) Commit(keys []string) error {
d.cacheMu.Lock()
defer d.cacheMu.Unlock()
kvs := make([]types.KeyValue, len(keys))
for i, key := range keys {
value, ok := d.cache[key]
if !ok {
d.cacheMu.Unlock()
return fmt.Errorf("key %v has not been previously set", key)
}
kvs[i] = types.KeyValue{Key: key, Value: value.Value, WorkspaceId: value.WorkspaceId}
}
d.cacheMu.Unlock()
keysList := lo.PartitionBy(kvs, func(kv types.KeyValue) string {
return kv.WorkspaceId
})
Expand All @@ -100,9 +102,11 @@ func (d *ScyllaDB) Commit(keys []string) error {
if err := d.scylla.ExecuteBatch(scyllaBatch); err != nil {
return fmt.Errorf("error committing keys: %v", err)

Check warning on line 103 in services/dedup/scylla/scylla.go

View check run for this annotation

Codecov / codecov/patch

services/dedup/scylla/scylla.go#L103

Added line #L103 was not covered by tests
}
d.cacheMu.Lock()
for _, key := range batch {
delete(d.cache, key.Key)
}
d.cacheMu.Unlock()
}
}
return nil
Expand Down

0 comments on commit 69f170e

Please sign in to comment.