From bfe338a858d72019e8eeb37b936390348987af00 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Fri, 8 Mar 2024 00:35:00 -0800 Subject: [PATCH 01/15] [extension/ackextension] Implement in-memory ack extension --- .chloggen/ack_extension_impl.yaml | 27 ++++++++ extension/ackextension/ackextension.go | 2 +- extension/ackextension/inmemory.go | 83 +++++++++++++++++++++++++ extension/ackextension/inmemory_test.go | 75 ++++++++++++++++++++++ 4 files changed, 186 insertions(+), 1 deletion(-) create mode 100755 .chloggen/ack_extension_impl.yaml create mode 100644 extension/ackextension/inmemory.go create mode 100644 extension/ackextension/inmemory_test.go diff --git a/.chloggen/ack_extension_impl.yaml b/.chloggen/ack_extension_impl.yaml new file mode 100755 index 000000000000..c7c0b0ff3362 --- /dev/null +++ b/.chloggen/ack_extension_impl.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: ackextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: adding the in-memory implementation of the ackextension + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26376] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/extension/ackextension/ackextension.go b/extension/ackextension/ackextension.go index bd5159a78265..f337d1bc9a2f 100644 --- a/extension/ackextension/ackextension.go +++ b/extension/ackextension/ackextension.go @@ -7,7 +7,7 @@ package ackextension // import "github.com/open-telemetry/opentelemetry-collecto // to check the status of given ack ids. type AckExtension interface { // ProcessEvent marks the beginning of processing an event. It generates an ack ID for the associated partition ID. - // ACK IDs are only unique within a partition. Two partitions can have the same ACK IDs but they are generated for different events. + // ACK IDs are only unique within a partition. Two partitions can have the same ACK IDs, but they are generated for different events. ProcessEvent(partitionID string) (ackID uint64) // Ack acknowledges an event has been processed. diff --git a/extension/ackextension/inmemory.go b/extension/ackextension/inmemory.go new file mode 100644 index 000000000000..63d98f866e8b --- /dev/null +++ b/extension/ackextension/inmemory.go @@ -0,0 +1,83 @@ +package ackextension + +import ( + "sync" +) + +type InMemoryAckExtension struct { + partitionMap sync.Map +} + +func NewInMemoryAckExtension() AckExtension { + return &InMemoryAckExtension{} +} + +type ackStatus struct { + id uint64 + ackMap sync.Map +} + +func newAckStatus() *ackStatus { + id := uint64(0) + ret := ackStatus{id: id} + ret.ackMap.Store(id, false) + return &ret +} + +func (as *ackStatus) nextAck() uint64 { + as.id += 1 + as.ackMap.Store(as.id, false) + return as.id +} + +func (as *ackStatus) ack(key uint64) { + if _, ok := as.ackMap.Load(key); ok { + as.ackMap.Store(key, true) + } +} + +func (as *ackStatus) queryAcks(ackIDs []uint64) map[uint64]bool { + result := map[uint64]bool{} + for _, val := range ackIDs { + if isAcked, ok := as.ackMap.Load(val); ok && isAcked.(bool) { + result[val] = true + as.ackMap.Delete(val) + } else { + result[val] = false + } + } + + return result +} + +func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) { + if actual, loaded := i.partitionMap.LoadOrStore(partitionID, newAckStatus()); loaded { + ackStats := actual.(*ackStatus) + return ackStats.nextAck() + } + + return 0 +} + +func (i *InMemoryAckExtension) Ack(partitionID string, ackID uint64) { + if val, ok := i.partitionMap.Load(partitionID); ok { + if ackStats, ok := val.(*ackStatus); ok { + ackStats.ack(ackID) + } + } +} + +func (i *InMemoryAckExtension) QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool { + if val, ok := i.partitionMap.Load(partitionID); ok { + if ackStats, ok := val.(*ackStatus); ok { + return ackStats.queryAcks(ackIDs) + } + } + + result := map[uint64]bool{} + for _, ackID := range ackIDs { + result[ackID] = false + } + + return result +} diff --git a/extension/ackextension/inmemory_test.go b/extension/ackextension/inmemory_test.go new file mode 100644 index 000000000000..30fa6354fe18 --- /dev/null +++ b/extension/ackextension/inmemory_test.go @@ -0,0 +1,75 @@ +package ackextension_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension" +) + +func TestExtensionAck(t *testing.T) { + ext := ackextension.NewInMemoryAckExtension() + + // send events through different partitions + for i := 0; i < 100; i++ { + // each partition has 3 events + for j := 0; j < 3; j++ { + ext.ProcessEvent(fmt.Sprintf("part-%d", i)) + } + } + + // non-acked events should be return false + for i := 0; i < 100; i++ { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], false) + require.Equal(t, result[1], false) + require.Equal(t, result[2], false) + } + + // ack the second event of all even partitions and first and third events of all odd partitions + for i := 0; i < 100; i++ { + if i%2 == 0 { + ext.Ack(fmt.Sprintf("part-%d", i), 1) + } else { + ext.Ack(fmt.Sprintf("part-%d", i), 0) + ext.Ack(fmt.Sprintf("part-%d", i), 2) + } + } + + // second event of even partitions should be acked, and first and third events of odd partitions should be acked + for i := 0; i < 100; i++ { + if i%2 == 0 { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], false) + require.Equal(t, result[1], true) + require.Equal(t, result[2], false) + } else { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], true) + require.Equal(t, result[1], false) + require.Equal(t, result[2], true) + } + } + + // querying the same acked events should result in false + for i := 0; i < 100; i++ { + if i%2 == 0 { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], false) + require.Equal(t, result[1], false) + require.Equal(t, result[2], false) + } else { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], false) + require.Equal(t, result[1], false) + require.Equal(t, result[2], false) + } + } +} From 24c10916f06076518f5cf8c3d12e75cf0866da0e Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Fri, 8 Mar 2024 16:19:47 -0800 Subject: [PATCH 02/15] address MR comments --- extension/ackextension/config.go | 5 ++++ extension/ackextension/inmemory.go | 34 ++++++++++++++----------- extension/ackextension/inmemory_test.go | 3 +++ 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/extension/ackextension/config.go b/extension/ackextension/config.go index e9e36b42f1a5..bf2840a91305 100644 --- a/extension/ackextension/config.go +++ b/extension/ackextension/config.go @@ -10,4 +10,9 @@ import ( type Config struct { // StorageID defines the storage type of the extension. In-memory type is set by default (if not provided). Future consideration is disk type. StorageID *component.ID `mapstructure:"storage"` + // MaxNumPartition Specifies the maximum number of partitions that clients can acquire for this extension instance. + // Implementation defines how limit exceeding should be handled + MaxNumPartition uint64 + // MaxNumPendingAcksPerPartition Specifies the maximum number of ackIDs and their corresponding status information that are waiting to be queried in each partition. + MaxNumPendingAcksPerPartition uint64 } diff --git a/extension/ackextension/inmemory.go b/extension/ackextension/inmemory.go index 63d98f866e8b..eb609a4c5aeb 100644 --- a/extension/ackextension/inmemory.go +++ b/extension/ackextension/inmemory.go @@ -1,7 +1,11 @@ -package ackextension +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ackextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension" import ( "sync" + "sync/atomic" ) type InMemoryAckExtension struct { @@ -13,21 +17,21 @@ func NewInMemoryAckExtension() AckExtension { } type ackStatus struct { - id uint64 + id atomic.Uint64 ackMap sync.Map } func newAckStatus() *ackStatus { id := uint64(0) - ret := ackStatus{id: id} + ret := ackStatus{} + ret.id.Store(id) ret.ackMap.Store(id, false) return &ret } func (as *ackStatus) nextAck() uint64 { - as.id += 1 - as.ackMap.Store(as.id, false) - return as.id + as.ackMap.Store(as.id.Add(1), false) + return as.id.Load() } func (as *ackStatus) ack(key uint64) { @@ -36,8 +40,8 @@ func (as *ackStatus) ack(key uint64) { } } -func (as *ackStatus) queryAcks(ackIDs []uint64) map[uint64]bool { - result := map[uint64]bool{} +func (as *ackStatus) computeAcks(ackIDs []uint64) map[uint64]bool { + result := make(map[uint64]bool, len(ackIDs)) for _, val := range ackIDs { if isAcked, ok := as.ackMap.Load(val); ok && isAcked.(bool) { result[val] = true @@ -52,8 +56,8 @@ func (as *ackStatus) queryAcks(ackIDs []uint64) map[uint64]bool { func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) { if actual, loaded := i.partitionMap.LoadOrStore(partitionID, newAckStatus()); loaded { - ackStats := actual.(*ackStatus) - return ackStats.nextAck() + status := actual.(*ackStatus) + return status.nextAck() } return 0 @@ -61,20 +65,20 @@ func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) { func (i *InMemoryAckExtension) Ack(partitionID string, ackID uint64) { if val, ok := i.partitionMap.Load(partitionID); ok { - if ackStats, ok := val.(*ackStatus); ok { - ackStats.ack(ackID) + if status, ok := val.(*ackStatus); ok { + status.ack(ackID) } } } func (i *InMemoryAckExtension) QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool { if val, ok := i.partitionMap.Load(partitionID); ok { - if ackStats, ok := val.(*ackStatus); ok { - return ackStats.queryAcks(ackIDs) + if status, ok := val.(*ackStatus); ok { + return status.computeAcks(ackIDs) } } - result := map[uint64]bool{} + result := make(map[uint64]bool, len(ackIDs)) for _, ackID := range ackIDs { result[ackID] = false } diff --git a/extension/ackextension/inmemory_test.go b/extension/ackextension/inmemory_test.go index 30fa6354fe18..ba0dd7bdc899 100644 --- a/extension/ackextension/inmemory_test.go +++ b/extension/ackextension/inmemory_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package ackextension_test import ( From 74c21ea80ef0a10e02bcdb05769047eb70bf9758 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Mon, 11 Mar 2024 11:17:12 -0700 Subject: [PATCH 03/15] fix lint --- extension/ackextension/inmemory_test.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/extension/ackextension/inmemory_test.go b/extension/ackextension/inmemory_test.go index ba0dd7bdc899..6cb56f002fd7 100644 --- a/extension/ackextension/inmemory_test.go +++ b/extension/ackextension/inmemory_test.go @@ -61,18 +61,10 @@ func TestExtensionAck(t *testing.T) { // querying the same acked events should result in false for i := 0; i < 100; i++ { - if i%2 == 0 { - result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) - require.Equal(t, len(result), 3) - require.Equal(t, result[0], false) - require.Equal(t, result[1], false) - require.Equal(t, result[2], false) - } else { - result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) - require.Equal(t, len(result), 3) - require.Equal(t, result[0], false) - require.Equal(t, result[1], false) - require.Equal(t, result[2], false) - } + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], false) + require.Equal(t, result[1], false) + require.Equal(t, result[2], false) } } From 808f9a0ce15b0a481f46fac297bd5f392e27f7b0 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Mon, 11 Mar 2024 12:24:30 -0700 Subject: [PATCH 04/15] add async test --- extension/ackextension/inmemory.go | 8 +-- extension/ackextension/inmemory_test.go | 82 +++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 4 deletions(-) diff --git a/extension/ackextension/inmemory.go b/extension/ackextension/inmemory.go index eb609a4c5aeb..ce533c19a608 100644 --- a/extension/ackextension/inmemory.go +++ b/extension/ackextension/inmemory.go @@ -23,10 +23,10 @@ type ackStatus struct { func newAckStatus() *ackStatus { id := uint64(0) - ret := ackStatus{} - ret.id.Store(id) - ret.ackMap.Store(id, false) - return &ret + as := ackStatus{} + as.id.Store(id) + as.ackMap.Store(id, false) + return &as } func (as *ackStatus) nextAck() uint64 { diff --git a/extension/ackextension/inmemory_test.go b/extension/ackextension/inmemory_test.go index 6cb56f002fd7..4c2146ad68d6 100644 --- a/extension/ackextension/inmemory_test.go +++ b/extension/ackextension/inmemory_test.go @@ -5,6 +5,7 @@ package ackextension_test import ( "fmt" + "sync" "testing" "github.com/stretchr/testify/require" @@ -68,3 +69,84 @@ func TestExtensionAck(t *testing.T) { require.Equal(t, result[2], false) } } + +func TestExtensionAckAsync(t *testing.T) { + ext := ackextension.NewInMemoryAckExtension() + partitionCount := 100 + var wg sync.WaitGroup + wg.Add(partitionCount) + // send events through different partitions + for i := 0; i < partitionCount; i++ { + i := i + go func() { + // each partition has 3 events + for j := 0; j < 3; j++ { + ext.ProcessEvent(fmt.Sprintf("part-%d", i)) + } + wg.Done() + }() + } + + wg.Wait() + + // non-acked events should be return false + for i := 0; i < partitionCount; i++ { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], false) + require.Equal(t, result[1], false) + require.Equal(t, result[2], false) + } + + wg.Add(partitionCount) + // ack the second event of all even partitions and first and third events of all odd partitions + for i := 0; i < partitionCount; i++ { + i := i + go func() { + if i%2 == 0 { + ext.Ack(fmt.Sprintf("part-%d", i), 1) + } else { + ext.Ack(fmt.Sprintf("part-%d", i), 0) + ext.Ack(fmt.Sprintf("part-%d", i), 2) + } + wg.Done() + }() + } + + wg.Wait() + // second event of even partitions should be acked, and first and third events of odd partitions should be acked + for i := 0; i < partitionCount; i++ { + if i%2 == 0 { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], false) + require.Equal(t, result[1], true) + require.Equal(t, result[2], false) + } else { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], true) + require.Equal(t, result[1], false) + require.Equal(t, result[2], true) + } + } + wg.Add(100) + resultChan := make(chan map[uint64]bool, partitionCount) + // querying the same acked events should result in false + for i := 0; i < partitionCount; i++ { + i := i + go func() { + resultChan <- ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + wg.Done() + }() + } + wg.Wait() + + for i := 0; i < partitionCount; i++ { + result := <-resultChan + require.Equal(t, len(result), 3) + require.Equal(t, result[0], false) + require.Equal(t, result[1], false) + require.Equal(t, result[2], false) + } +} From d2c2b1bfecb260a7532c36e1d921af86f0702ecb Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Mon, 11 Mar 2024 13:15:04 -0700 Subject: [PATCH 05/15] fix check stage fail --- extension/ackextension/factory.go | 2 +- extension/ackextension/inmemory.go | 18 +++++++++++++++++- extension/ackextension/inmemory_test.go | 8 +++----- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/extension/ackextension/factory.go b/extension/ackextension/factory.go index a516cb1ebc96..cbf33af9f7d3 100644 --- a/extension/ackextension/factory.go +++ b/extension/ackextension/factory.go @@ -31,5 +31,5 @@ func createDefaultConfig() component.Config { } func createExtension(_ context.Context, _ extension.CreateSettings, _ component.Config) (extension.Extension, error) { - return nil, nil + return newInMemoryAckExtension(), nil } diff --git a/extension/ackextension/inmemory.go b/extension/ackextension/inmemory.go index ce533c19a608..4516064337c0 100644 --- a/extension/ackextension/inmemory.go +++ b/extension/ackextension/inmemory.go @@ -4,15 +4,18 @@ package ackextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension" import ( + "context" "sync" "sync/atomic" + + "go.opentelemetry.io/collector/component" ) type InMemoryAckExtension struct { partitionMap sync.Map } -func NewInMemoryAckExtension() AckExtension { +func newInMemoryAckExtension() *InMemoryAckExtension { return &InMemoryAckExtension{} } @@ -54,6 +57,17 @@ func (as *ackStatus) computeAcks(ackIDs []uint64) map[uint64]bool { return result } +// Start of InMemoryAckExtension does nothing and returns nil +func (i *InMemoryAckExtension) Start(_ context.Context, _ component.Host) error { + return nil +} + +// Shutdown of InMemoryAckExtension does nothing and returns nil +func (i *InMemoryAckExtension) Shutdown(_ context.Context) error { + return nil +} + +// ProcessEvent marks the beginning of processing an event. It generates an ack ID for the associated partition ID. func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) { if actual, loaded := i.partitionMap.LoadOrStore(partitionID, newAckStatus()); loaded { status := actual.(*ackStatus) @@ -63,6 +77,7 @@ func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) { return 0 } +// Ack acknowledges an event has been processed. func (i *InMemoryAckExtension) Ack(partitionID string, ackID uint64) { if val, ok := i.partitionMap.Load(partitionID); ok { if status, ok := val.(*ackStatus); ok { @@ -71,6 +86,7 @@ func (i *InMemoryAckExtension) Ack(partitionID string, ackID uint64) { } } +// QueryAcks checks the statuses of given ackIDs for a partition. func (i *InMemoryAckExtension) QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool { if val, ok := i.partitionMap.Load(partitionID); ok { if status, ok := val.(*ackStatus); ok { diff --git a/extension/ackextension/inmemory_test.go b/extension/ackextension/inmemory_test.go index 4c2146ad68d6..f9e3ed869629 100644 --- a/extension/ackextension/inmemory_test.go +++ b/extension/ackextension/inmemory_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package ackextension_test +package ackextension import ( "fmt" @@ -9,12 +9,10 @@ import ( "testing" "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension" ) func TestExtensionAck(t *testing.T) { - ext := ackextension.NewInMemoryAckExtension() + ext := newInMemoryAckExtension() // send events through different partitions for i := 0; i < 100; i++ { @@ -71,7 +69,7 @@ func TestExtensionAck(t *testing.T) { } func TestExtensionAckAsync(t *testing.T) { - ext := ackextension.NewInMemoryAckExtension() + ext := newInMemoryAckExtension() partitionCount := 100 var wg sync.WaitGroup wg.Add(partitionCount) From 365cdba5ec3467ec60bf0adfeb49632a35436414 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Mon, 11 Mar 2024 15:35:09 -0700 Subject: [PATCH 06/15] add goleak check --- extension/ackextension/go.mod | 1 + extension/ackextension/package_test.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+) create mode 100644 extension/ackextension/package_test.go diff --git a/extension/ackextension/go.mod b/extension/ackextension/go.mod index 586a98f35078..e0e329efa7d2 100644 --- a/extension/ackextension/go.mod +++ b/extension/ackextension/go.mod @@ -8,6 +8,7 @@ require ( go.opentelemetry.io/collector/extension v0.95.0 go.opentelemetry.io/otel/metric v1.23.1 go.opentelemetry.io/otel/trace v1.23.1 + go.uber.org/goleak v1.3.0 ) require ( diff --git a/extension/ackextension/package_test.go b/extension/ackextension/package_test.go new file mode 100644 index 000000000000..97b324bc2f00 --- /dev/null +++ b/extension/ackextension/package_test.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package ackextension + +import ( + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} From d4e660bc80d8fedc069a7773c61b98a684fcb020 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Tue, 12 Mar 2024 13:03:05 -0700 Subject: [PATCH 07/15] Add LRU --- extension/ackextension/README.md | 5 +- extension/ackextension/config.go | 23 +++++++-- extension/ackextension/factory.go | 4 +- extension/ackextension/go.mod | 1 + extension/ackextension/go.sum | 2 + extension/ackextension/inmemory.go | 54 ++++++++++++--------- extension/ackextension/inmemory_test.go | 10 +++- extension/ackextension/testdata/config.yaml | 4 +- 8 files changed, 69 insertions(+), 34 deletions(-) diff --git a/extension/ackextension/README.md b/extension/ackextension/README.md index 84eb23c0e72c..361cd1cae4b9 100644 --- a/extension/ackextension/README.md +++ b/extension/ackextension/README.md @@ -19,13 +19,14 @@ if ack fails. ```yaml extensions: ack: - storage: + storage: + max_number_of_partition: 1000000 + max_number_of_pending_acks_per_partition: 1000000 receivers: splunk_hec: ack_extension: ack - service: extensions: [ack] pipelines: diff --git a/extension/ackextension/config.go b/extension/ackextension/config.go index bf2840a91305..e58a49f7d1d3 100644 --- a/extension/ackextension/config.go +++ b/extension/ackextension/config.go @@ -6,13 +6,30 @@ import ( "go.opentelemetry.io/collector/component" ) +const ( + defaultMaxNumPartition = 1000000 + defaultMaxNumPendingAcksPerPartition = 1000000 +) + // Config defines configuration for ack extension type Config struct { // StorageID defines the storage type of the extension. In-memory type is set by default (if not provided). Future consideration is disk type. StorageID *component.ID `mapstructure:"storage"` // MaxNumPartition Specifies the maximum number of partitions that clients can acquire for this extension instance. - // Implementation defines how limit exceeding should be handled - MaxNumPartition uint64 + // Implementation defines how limit exceeding should be handled. + MaxNumPartition uint64 `mapstructure:"max_number_of_partition"` // MaxNumPendingAcksPerPartition Specifies the maximum number of ackIDs and their corresponding status information that are waiting to be queried in each partition. - MaxNumPendingAcksPerPartition uint64 + MaxNumPendingAcksPerPartition uint64 `mapstructure:"max_number_of_pending_acks_per_partition"` +} + +func (cfg *Config) Validate() error { + if cfg.MaxNumPartition <= 0 { + cfg.MaxNumPartition = defaultMaxNumPartition + } + + if cfg.MaxNumPendingAcksPerPartition <= 0 { + cfg.MaxNumPendingAcksPerPartition = defaultMaxNumPendingAcksPerPartition + } + + return nil } diff --git a/extension/ackextension/factory.go b/extension/ackextension/factory.go index cbf33af9f7d3..f734e5ac5919 100644 --- a/extension/ackextension/factory.go +++ b/extension/ackextension/factory.go @@ -30,6 +30,6 @@ func createDefaultConfig() component.Config { } } -func createExtension(_ context.Context, _ extension.CreateSettings, _ component.Config) (extension.Extension, error) { - return newInMemoryAckExtension(), nil +func createExtension(_ context.Context, _ extension.CreateSettings, cfg component.Config) (extension.Extension, error) { + return newInMemoryAckExtension(cfg.(*Config)), nil } diff --git a/extension/ackextension/go.mod b/extension/ackextension/go.mod index 3fc913fe9f13..79a8401d781a 100644 --- a/extension/ackextension/go.mod +++ b/extension/ackextension/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackex go 1.21 require ( + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.96.1-0.20240306115632-b2693620eff6 go.opentelemetry.io/collector/extension v0.96.1-0.20240306115632-b2693620eff6 diff --git a/extension/ackextension/go.sum b/extension/ackextension/go.sum index 85c13ca75f6b..5d5c822d7e84 100644 --- a/extension/ackextension/go.sum +++ b/extension/ackextension/go.sum @@ -18,6 +18,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= diff --git a/extension/ackextension/inmemory.go b/extension/ackextension/inmemory.go index 4516064337c0..3b75c40d4a9a 100644 --- a/extension/ackextension/inmemory.go +++ b/extension/ackextension/inmemory.go @@ -5,50 +5,60 @@ package ackextension // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" - "sync" "sync/atomic" + lru "github.com/hashicorp/golang-lru/v2" "go.opentelemetry.io/collector/component" ) type InMemoryAckExtension struct { - partitionMap sync.Map + partitionMap *lru.Cache[string, *ackStatus] + maxNumPendingAcksPerPartition uint64 } -func newInMemoryAckExtension() *InMemoryAckExtension { - return &InMemoryAckExtension{} +func newInMemoryAckExtension(conf *Config) *InMemoryAckExtension { + cache, _ := lru.New[string, *ackStatus](int(conf.MaxNumPartition)) + return &InMemoryAckExtension{ + partitionMap: cache, + maxNumPendingAcksPerPartition: defaultMaxNumPendingAcksPerPartition, + } } type ackStatus struct { id atomic.Uint64 - ackMap sync.Map + ackMap *lru.Cache[uint64, bool] } -func newAckStatus() *ackStatus { +func newAckStatus(maxPendingAcks uint64) *ackStatus { id := uint64(0) - as := ackStatus{} + + cache, _ := lru.New[uint64, bool](int(maxPendingAcks)) + cache.Add(id, false) + + as := ackStatus{ + ackMap: cache, + } as.id.Store(id) - as.ackMap.Store(id, false) return &as } func (as *ackStatus) nextAck() uint64 { - as.ackMap.Store(as.id.Add(1), false) + as.ackMap.Add(as.id.Add(1), false) return as.id.Load() } func (as *ackStatus) ack(key uint64) { - if _, ok := as.ackMap.Load(key); ok { - as.ackMap.Store(key, true) + if _, ok := as.ackMap.Get(key); ok { + as.ackMap.Add(key, true) } } func (as *ackStatus) computeAcks(ackIDs []uint64) map[uint64]bool { result := make(map[uint64]bool, len(ackIDs)) for _, val := range ackIDs { - if isAcked, ok := as.ackMap.Load(val); ok && isAcked.(bool) { + if isAcked, ok := as.ackMap.Get(val); ok && isAcked { result[val] = true - as.ackMap.Delete(val) + as.ackMap.Remove(val) } else { result[val] = false } @@ -69,29 +79,25 @@ func (i *InMemoryAckExtension) Shutdown(_ context.Context) error { // ProcessEvent marks the beginning of processing an event. It generates an ack ID for the associated partition ID. func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) { - if actual, loaded := i.partitionMap.LoadOrStore(partitionID, newAckStatus()); loaded { - status := actual.(*ackStatus) - return status.nextAck() + if val, ok := i.partitionMap.Get(partitionID); ok { + return val.nextAck() } + i.partitionMap.Add(partitionID, newAckStatus(i.maxNumPendingAcksPerPartition)) return 0 } // Ack acknowledges an event has been processed. func (i *InMemoryAckExtension) Ack(partitionID string, ackID uint64) { - if val, ok := i.partitionMap.Load(partitionID); ok { - if status, ok := val.(*ackStatus); ok { - status.ack(ackID) - } + if val, ok := i.partitionMap.Get(partitionID); ok { + val.ack(ackID) } } // QueryAcks checks the statuses of given ackIDs for a partition. func (i *InMemoryAckExtension) QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool { - if val, ok := i.partitionMap.Load(partitionID); ok { - if status, ok := val.(*ackStatus); ok { - return status.computeAcks(ackIDs) - } + if val, ok := i.partitionMap.Get(partitionID); ok { + return val.computeAcks(ackIDs) } result := make(map[uint64]bool, len(ackIDs)) diff --git a/extension/ackextension/inmemory_test.go b/extension/ackextension/inmemory_test.go index f9e3ed869629..d2a38a43f5a6 100644 --- a/extension/ackextension/inmemory_test.go +++ b/extension/ackextension/inmemory_test.go @@ -12,7 +12,11 @@ import ( ) func TestExtensionAck(t *testing.T) { - ext := newInMemoryAckExtension() + conf := Config{ + MaxNumPartition: defaultMaxNumPartition, + MaxNumPendingAcksPerPartition: defaultMaxNumPendingAcksPerPartition, + } + ext := newInMemoryAckExtension(&conf) // send events through different partitions for i := 0; i < 100; i++ { @@ -69,7 +73,9 @@ func TestExtensionAck(t *testing.T) { } func TestExtensionAckAsync(t *testing.T) { - ext := newInMemoryAckExtension() + conf := Config{} + ext := newInMemoryAckExtension(&conf) + partitionCount := 100 var wg sync.WaitGroup wg.Add(partitionCount) diff --git a/extension/ackextension/testdata/config.yaml b/extension/ackextension/testdata/config.yaml index 65e6f6b770ad..6e8aeff0ebfa 100644 --- a/extension/ackextension/testdata/config.yaml +++ b/extension/ackextension/testdata/config.yaml @@ -3,6 +3,8 @@ ack: ack/withmemorystorage: storage: + ack/withpersistentstorage: storage: file_storage/otc - + max_number_of_partition: 200000 + max_number_of_pending_acks_per_partition: 3000000 \ No newline at end of file From 624311a7d6d34a786b1c9e4374bacce7e19511e8 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Tue, 12 Mar 2024 15:17:28 -0700 Subject: [PATCH 08/15] Address PR comments --- extension/ackextension/ackextension.go | 1 + extension/ackextension/config.go | 4 +- extension/ackextension/inmemory.go | 22 +++--- extension/ackextension/inmemory_test.go | 93 ++++++++++++++++++++++++- 4 files changed, 107 insertions(+), 13 deletions(-) diff --git a/extension/ackextension/ackextension.go b/extension/ackextension/ackextension.go index f337d1bc9a2f..6e2563331b6c 100644 --- a/extension/ackextension/ackextension.go +++ b/extension/ackextension/ackextension.go @@ -14,5 +14,6 @@ type AckExtension interface { Ack(partitionID string, ackID uint64) // QueryAcks checks the statuses of given ackIDs for a partition. + // ackIDs that are not generated from ProcessEvent or have been removed as a result of previous calls to QueryAcks will return false QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool } diff --git a/extension/ackextension/config.go b/extension/ackextension/config.go index e58a49f7d1d3..0039c49bbf04 100644 --- a/extension/ackextension/config.go +++ b/extension/ackextension/config.go @@ -7,8 +7,8 @@ import ( ) const ( - defaultMaxNumPartition = 1000000 - defaultMaxNumPendingAcksPerPartition = 1000000 + defaultMaxNumPartition = 1_000_000 + defaultMaxNumPendingAcksPerPartition = 1_000_000 ) // Config defines configuration for ack extension diff --git a/extension/ackextension/inmemory.go b/extension/ackextension/inmemory.go index 3b75c40d4a9a..1e3b9b602fe8 100644 --- a/extension/ackextension/inmemory.go +++ b/extension/ackextension/inmemory.go @@ -11,49 +11,51 @@ import ( "go.opentelemetry.io/collector/component" ) +// InMemoryAckExtension is the in-memory implementation of the AckExtension type InMemoryAckExtension struct { - partitionMap *lru.Cache[string, *ackStatus] + partitionMap *lru.Cache[string, *ackPartition] maxNumPendingAcksPerPartition uint64 } func newInMemoryAckExtension(conf *Config) *InMemoryAckExtension { - cache, _ := lru.New[string, *ackStatus](int(conf.MaxNumPartition)) + cache, _ := lru.New[string, *ackPartition](int(conf.MaxNumPartition)) return &InMemoryAckExtension{ partitionMap: cache, maxNumPendingAcksPerPartition: defaultMaxNumPendingAcksPerPartition, } } -type ackStatus struct { +type ackPartition struct { id atomic.Uint64 ackMap *lru.Cache[uint64, bool] } -func newAckStatus(maxPendingAcks uint64) *ackStatus { +func newAckStatus(maxPendingAcks uint64) *ackPartition { id := uint64(0) cache, _ := lru.New[uint64, bool](int(maxPendingAcks)) cache.Add(id, false) - as := ackStatus{ + as := ackPartition{ ackMap: cache, } as.id.Store(id) return &as } -func (as *ackStatus) nextAck() uint64 { - as.ackMap.Add(as.id.Add(1), false) - return as.id.Load() +func (as *ackPartition) nextAck() uint64 { + id := as.id.Add(1) + as.ackMap.Add(id, false) + return id } -func (as *ackStatus) ack(key uint64) { +func (as *ackPartition) ack(key uint64) { if _, ok := as.ackMap.Get(key); ok { as.ackMap.Add(key, true) } } -func (as *ackStatus) computeAcks(ackIDs []uint64) map[uint64]bool { +func (as *ackPartition) computeAcks(ackIDs []uint64) map[uint64]bool { result := make(map[uint64]bool, len(ackIDs)) for _, val := range ackIDs { if isAcked, ok := as.ackMap.Get(val); ok && isAcked { diff --git a/extension/ackextension/inmemory_test.go b/extension/ackextension/inmemory_test.go index d2a38a43f5a6..8b1bbc8f95ae 100644 --- a/extension/ackextension/inmemory_test.go +++ b/extension/ackextension/inmemory_test.go @@ -11,7 +11,39 @@ import ( "github.com/stretchr/testify/require" ) -func TestExtensionAck(t *testing.T) { +func TestAckPartitionNextAckConcurrency(t *testing.T) { + ackSize := 1_000_000 + ap := newAckStatus(uint64(ackSize)) + ackIdMap := sync.Map{} + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + for i := 0; i < ackSize/2; i++ { + ackIdMap.Store(ap.nextAck(), struct { + }{}) + } + wg.Done() + }() + go func() { + for i := 0; i < ackSize/2; i++ { + ackIdMap.Store(ap.nextAck(), struct { + }{}) + } + wg.Done() + }() + + wg.Wait() + + var size int + ackIdMap.Range(func(k, v interface{}) bool { + size++ + return true + }) + + require.Equal(t, size, ackSize) +} + +func TestExtensionAck_ProcessEvents_EventsUnAcked(t *testing.T) { conf := Config{ MaxNumPartition: defaultMaxNumPartition, MaxNumPendingAcksPerPartition: defaultMaxNumPendingAcksPerPartition, @@ -34,6 +66,65 @@ func TestExtensionAck(t *testing.T) { require.Equal(t, result[1], false) require.Equal(t, result[2], false) } +} + +func TestExtensionAck_ProcessEvents_EventsAcked(t *testing.T) { + conf := Config{ + MaxNumPartition: defaultMaxNumPartition, + MaxNumPendingAcksPerPartition: defaultMaxNumPendingAcksPerPartition, + } + ext := newInMemoryAckExtension(&conf) + + // send events through different partitions + for i := 0; i < 100; i++ { + // each partition has 3 events + for j := 0; j < 3; j++ { + ext.ProcessEvent(fmt.Sprintf("part-%d", i)) + } + } + + // ack the second event of all even partitions and first and third events of all odd partitions + for i := 0; i < 100; i++ { + if i%2 == 0 { + ext.Ack(fmt.Sprintf("part-%d", i), 1) + } else { + ext.Ack(fmt.Sprintf("part-%d", i), 0) + ext.Ack(fmt.Sprintf("part-%d", i), 2) + } + } + + // second event of even partitions should be acked, and first and third events of odd partitions should be acked + for i := 0; i < 100; i++ { + if i%2 == 0 { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], false) + require.Equal(t, result[1], true) + require.Equal(t, result[2], false) + } else { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + require.Equal(t, len(result), 3) + require.Equal(t, result[0], true) + require.Equal(t, result[1], false) + require.Equal(t, result[2], true) + } + } +} + +func TestExtensionAck_QueryAcks_Unidempotent(t *testing.T) { + conf := Config{ + MaxNumPartition: defaultMaxNumPartition, + MaxNumPendingAcksPerPartition: defaultMaxNumPendingAcksPerPartition, + } + ext := newInMemoryAckExtension(&conf) + + // send events through different partitions + for i := 0; i < 100; i++ { + // each partition has 3 events + for j := 0; j < 3; j++ { + ext.ProcessEvent(fmt.Sprintf("part-%d", i)) + } + } // ack the second event of all even partitions and first and third events of all odd partitions for i := 0; i < 100; i++ { From 2baa40e7747255e189778214c72858ae884c9e58 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Tue, 12 Mar 2024 16:04:42 -0700 Subject: [PATCH 09/15] fix lint --- extension/ackextension/inmemory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extension/ackextension/inmemory_test.go b/extension/ackextension/inmemory_test.go index 50275c1d9e63..3653762c59ae 100644 --- a/extension/ackextension/inmemory_test.go +++ b/extension/ackextension/inmemory_test.go @@ -35,7 +35,7 @@ func TestAckPartitionNextAckConcurrency(t *testing.T) { wg.Wait() var size int - ackIDMap.Range(func(k, v interface{}) bool { + ackIDMap.Range(func(_, _ any) bool { size++ return true }) From d9754928bacb87de3bf9e0dc93795f738a663944 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Wed, 13 Mar 2024 00:59:02 -0700 Subject: [PATCH 10/15] Address PR comments --- extension/ackextension/config.go | 5 +++-- extension/ackextension/config_test.go | 26 +++++++++++++++++++++++++ extension/ackextension/inmemory.go | 19 ++++++++---------- extension/ackextension/inmemory_test.go | 21 +++++++++----------- 4 files changed, 46 insertions(+), 25 deletions(-) create mode 100644 extension/ackextension/config_test.go diff --git a/extension/ackextension/config.go b/extension/ackextension/config.go index 0039c49bbf04..9bba27c98e28 100644 --- a/extension/ackextension/config.go +++ b/extension/ackextension/config.go @@ -22,12 +22,13 @@ type Config struct { MaxNumPendingAcksPerPartition uint64 `mapstructure:"max_number_of_pending_acks_per_partition"` } +// Validate checks that valid inputs are provided. Otherwise, assign default values. func (cfg *Config) Validate() error { - if cfg.MaxNumPartition <= 0 { + if cfg.MaxNumPartition == 0 { cfg.MaxNumPartition = defaultMaxNumPartition } - if cfg.MaxNumPendingAcksPerPartition <= 0 { + if cfg.MaxNumPendingAcksPerPartition == 0 { cfg.MaxNumPendingAcksPerPartition = defaultMaxNumPendingAcksPerPartition } diff --git a/extension/ackextension/config_test.go b/extension/ackextension/config_test.go new file mode 100644 index 000000000000..36126ccd1348 --- /dev/null +++ b/extension/ackextension/config_test.go @@ -0,0 +1,26 @@ +package ackextension + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValidateConfigWithEmptyOrZeroValConfig(t *testing.T) { + cfg := Config{} + err := cfg.Validate() + require.Equal(t, err, nil) + require.Equal(t, cfg.MaxNumPendingAcksPerPartition, defaultMaxNumPendingAcksPerPartition) + require.Equal(t, cfg.MaxNumPartition, defaultMaxNumPartition) +} + +func TestValidateConfigWithValidConfig(t *testing.T) { + cfg := Config{ + MaxNumPendingAcksPerPartition: 3, + MaxNumPartition: 5, + } + err := cfg.Validate() + require.Equal(t, err, nil) + require.Equal(t, cfg.MaxNumPendingAcksPerPartition, 3) + require.Equal(t, cfg.MaxNumPartition, 5) +} diff --git a/extension/ackextension/inmemory.go b/extension/ackextension/inmemory.go index 1e3b9b602fe8..6a75d757adea 100644 --- a/extension/ackextension/inmemory.go +++ b/extension/ackextension/inmemory.go @@ -12,6 +12,8 @@ import ( ) // InMemoryAckExtension is the in-memory implementation of the AckExtension +// When MaxNumPartition is reached, the acks associated with the least recently used partition are evicted. +// When MaxNumPendingAcksPerPartition is reached, the least recently used ack is evicted type InMemoryAckExtension struct { partitionMap *lru.Cache[string, *ackPartition] maxNumPendingAcksPerPartition uint64 @@ -21,7 +23,7 @@ func newInMemoryAckExtension(conf *Config) *InMemoryAckExtension { cache, _ := lru.New[string, *ackPartition](int(conf.MaxNumPartition)) return &InMemoryAckExtension{ partitionMap: cache, - maxNumPendingAcksPerPartition: defaultMaxNumPendingAcksPerPartition, + maxNumPendingAcksPerPartition: conf.MaxNumPendingAcksPerPartition, } } @@ -30,17 +32,11 @@ type ackPartition struct { ackMap *lru.Cache[uint64, bool] } -func newAckStatus(maxPendingAcks uint64) *ackPartition { - id := uint64(0) - +func newAckPartition(maxPendingAcks uint64) *ackPartition { cache, _ := lru.New[uint64, bool](int(maxPendingAcks)) - cache.Add(id, false) - - as := ackPartition{ + return &ackPartition{ ackMap: cache, } - as.id.Store(id) - return &as } func (as *ackPartition) nextAck() uint64 { @@ -85,8 +81,9 @@ func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) { return val.nextAck() } - i.partitionMap.Add(partitionID, newAckStatus(i.maxNumPendingAcksPerPartition)) - return 0 + i.partitionMap.ContainsOrAdd(partitionID, newAckPartition(i.maxNumPendingAcksPerPartition)) + val, _ := i.partitionMap.Get(partitionID) + return val.nextAck() } // Ack acknowledges an event has been processed. diff --git a/extension/ackextension/inmemory_test.go b/extension/ackextension/inmemory_test.go index 3653762c59ae..0970c31161e2 100644 --- a/extension/ackextension/inmemory_test.go +++ b/extension/ackextension/inmemory_test.go @@ -13,34 +13,31 @@ import ( func TestAckPartitionNextAckConcurrency(t *testing.T) { ackSize := 1_000_000 - ap := newAckStatus(uint64(ackSize)) - ackIDMap := sync.Map{} + ap := newAckPartition(uint64(ackSize)) + map1 := map[uint64]struct{}{} + map2 := map[uint64]struct{}{} wg := sync.WaitGroup{} wg.Add(2) go func() { for i := 0; i < ackSize/2; i++ { - ackIDMap.Store(ap.nextAck(), struct { - }{}) + map1[ap.nextAck()] = struct{}{} } wg.Done() }() go func() { for i := 0; i < ackSize/2; i++ { - ackIDMap.Store(ap.nextAck(), struct { - }{}) + map2[ap.nextAck()] = struct{}{} } wg.Done() }() wg.Wait() - var size int - ackIDMap.Range(func(_, _ any) bool { - size++ - return true - }) + for k, v := range map2 { + map1[k] = v + } - require.Equal(t, size, ackSize) + require.Equal(t, len(map1), ackSize) } func TestExtensionAck_ProcessEvents_EventsUnAcked(t *testing.T) { From 107b7d9c9ed91b49a29700181515179c460c4fcc Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Wed, 13 Mar 2024 01:11:56 -0700 Subject: [PATCH 11/15] fix lint --- extension/ackextension/config.go | 4 ++-- extension/ackextension/config_test.go | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/extension/ackextension/config.go b/extension/ackextension/config.go index 9bba27c98e28..5fd4208512e8 100644 --- a/extension/ackextension/config.go +++ b/extension/ackextension/config.go @@ -7,8 +7,8 @@ import ( ) const ( - defaultMaxNumPartition = 1_000_000 - defaultMaxNumPendingAcksPerPartition = 1_000_000 + defaultMaxNumPartition uint64 = 1_000_000 + defaultMaxNumPendingAcksPerPartition uint64 = 1_000_000 ) // Config defines configuration for ack extension diff --git a/extension/ackextension/config_test.go b/extension/ackextension/config_test.go index 36126ccd1348..ff4c9b55f730 100644 --- a/extension/ackextension/config_test.go +++ b/extension/ackextension/config_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package ackextension import ( From a6dc6bd2a8406622fb262abdafcbb79a7b5fd419 Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Wed, 13 Mar 2024 13:21:34 -0700 Subject: [PATCH 12/15] add test and use maps.copy --- extension/ackextension/config_test.go | 4 +- extension/ackextension/go.mod | 1 + extension/ackextension/go.sum | 2 + extension/ackextension/inmemory_test.go | 130 ++++++++++++++++-------- 4 files changed, 93 insertions(+), 44 deletions(-) diff --git a/extension/ackextension/config_test.go b/extension/ackextension/config_test.go index ff4c9b55f730..205b118bae5f 100644 --- a/extension/ackextension/config_test.go +++ b/extension/ackextension/config_test.go @@ -24,6 +24,6 @@ func TestValidateConfigWithValidConfig(t *testing.T) { } err := cfg.Validate() require.Equal(t, err, nil) - require.Equal(t, cfg.MaxNumPendingAcksPerPartition, 3) - require.Equal(t, cfg.MaxNumPartition, 5) + require.Equal(t, cfg.MaxNumPendingAcksPerPartition, uint64(3)) + require.Equal(t, cfg.MaxNumPartition, uint64(5)) } diff --git a/extension/ackextension/go.mod b/extension/ackextension/go.mod index 79a8401d781a..ade71388a21c 100644 --- a/extension/ackextension/go.mod +++ b/extension/ackextension/go.mod @@ -10,6 +10,7 @@ require ( go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/goleak v1.3.0 + golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 ) require ( diff --git a/extension/ackextension/go.sum b/extension/ackextension/go.sum index 5d5c822d7e84..c5d05f384056 100644 --- a/extension/ackextension/go.sum +++ b/extension/ackextension/go.sum @@ -83,6 +83,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= diff --git a/extension/ackextension/inmemory_test.go b/extension/ackextension/inmemory_test.go index 0970c31161e2..8e893866a5f7 100644 --- a/extension/ackextension/inmemory_test.go +++ b/extension/ackextension/inmemory_test.go @@ -5,6 +5,7 @@ package ackextension import ( "fmt" + "maps" "sync" "testing" @@ -33,11 +34,56 @@ func TestAckPartitionNextAckConcurrency(t *testing.T) { wg.Wait() - for k, v := range map2 { - map1[k] = v + maps.Copy(map1, map2) + require.Equal(t, len(map1), ackSize) +} + +func TestExtensionAck_ProcessEvents_Concurrency(t *testing.T) { + partitionName := "partition-name" + conf := Config{ + MaxNumPartition: defaultMaxNumPartition, + MaxNumPendingAcksPerPartition: defaultMaxNumPendingAcksPerPartition, } + ext := newInMemoryAckExtension(&conf) - require.Equal(t, len(map1), ackSize) + var wg sync.WaitGroup + wg.Add(3) + + map1 := map[uint64]struct{}{} + map2 := map[uint64]struct{}{} + map3 := map[uint64]struct{}{} + + // send events through different partitions + go func() { + for i := 0; i < 100; i++ { + // each partition has 3 events + map1[ext.ProcessEvent(fmt.Sprint(partitionName))] = struct{}{} + } + wg.Done() + }() + + go func() { + for i := 0; i < 100; i++ { + // each partition has 3 events + map2[ext.ProcessEvent(fmt.Sprint(partitionName))] = struct{}{} + } + wg.Done() + }() + + go func() { + for i := 0; i < 100; i++ { + // each partition has 3 events + map3[ext.ProcessEvent(fmt.Sprint(partitionName))] = struct{}{} + } + wg.Done() + }() + + wg.Wait() + + maps.Copy(map1, map2) + maps.Copy(map1, map3) + + require.Equal(t, len(map1), 300) } func TestExtensionAck_ProcessEvents_EventsUnAcked(t *testing.T) { @@ -83,27 +129,27 @@ func TestExtensionAck_ProcessEvents_EventsAcked(t *testing.T) { // ack the second event of all even partitions and first and third events of all odd partitions for i := 0; i < 100; i++ { if i%2 == 0 { - ext.Ack(fmt.Sprintf("part-%d", i), 1) - } else { - ext.Ack(fmt.Sprintf("part-%d", i), 0) ext.Ack(fmt.Sprintf("part-%d", i), 2) + } else { + ext.Ack(fmt.Sprintf("part-%d", i), 1) + ext.Ack(fmt.Sprintf("part-%d", i), 3) } } // second event of even partitions should be acked, and first and third events of odd partitions should be acked for i := 0; i < 100; i++ { if i%2 == 0 { - result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) - require.Equal(t, len(result), 3) - require.Equal(t, result[0], false) - require.Equal(t, result[1], true) - require.Equal(t, result[2], false) - } else { - result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{1, 2, 3}) require.Equal(t, len(result), 3) - require.Equal(t, result[0], true) require.Equal(t, result[1], false) require.Equal(t, result[2], true) + require.Equal(t, result[3], false) + } else { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{1, 2, 3}) + require.Equal(t, len(result), 3) + require.Equal(t, result[1], true) + require.Equal(t, result[2], false) + require.Equal(t, result[3], true) } } } @@ -126,37 +172,37 @@ func TestExtensionAck_QueryAcks_Unidempotent(t *testing.T) { // ack the second event of all even partitions and first and third events of all odd partitions for i := 0; i < 100; i++ { if i%2 == 0 { - ext.Ack(fmt.Sprintf("part-%d", i), 1) - } else { - ext.Ack(fmt.Sprintf("part-%d", i), 0) ext.Ack(fmt.Sprintf("part-%d", i), 2) + } else { + ext.Ack(fmt.Sprintf("part-%d", i), 1) + ext.Ack(fmt.Sprintf("part-%d", i), 3) } } // second event of even partitions should be acked, and first and third events of odd partitions should be acked for i := 0; i < 100; i++ { if i%2 == 0 { - result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{1, 2, 3}) require.Equal(t, len(result), 3) - require.Equal(t, result[0], false) - require.Equal(t, result[1], true) - require.Equal(t, result[2], false) - } else { - result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) - require.Equal(t, len(result), 3) - require.Equal(t, result[0], true) require.Equal(t, result[1], false) require.Equal(t, result[2], true) + require.Equal(t, result[3], false) + } else { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{1, 2, 3}) + require.Equal(t, len(result), 3) + require.Equal(t, result[1], true) + require.Equal(t, result[2], false) + require.Equal(t, result[3], true) } } // querying the same acked events should result in false for i := 0; i < 100; i++ { - result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{1, 2, 3}) require.Equal(t, len(result), 3) - require.Equal(t, result[0], false) require.Equal(t, result[1], false) require.Equal(t, result[2], false) + require.Equal(t, result[3], false) } } @@ -186,11 +232,11 @@ func TestExtensionAckAsync(t *testing.T) { // non-acked events should be return false for i := 0; i < partitionCount; i++ { - result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{1, 2, 3}) require.Equal(t, len(result), 3) - require.Equal(t, result[0], false) require.Equal(t, result[1], false) require.Equal(t, result[2], false) + require.Equal(t, result[3], false) } wg.Add(partitionCount) @@ -199,10 +245,10 @@ func TestExtensionAckAsync(t *testing.T) { i := i go func() { if i%2 == 0 { - ext.Ack(fmt.Sprintf("part-%d", i), 1) - } else { - ext.Ack(fmt.Sprintf("part-%d", i), 0) ext.Ack(fmt.Sprintf("part-%d", i), 2) + } else { + ext.Ack(fmt.Sprintf("part-%d", i), 1) + ext.Ack(fmt.Sprintf("part-%d", i), 3) } wg.Done() }() @@ -212,17 +258,17 @@ func TestExtensionAckAsync(t *testing.T) { // second event of even partitions should be acked, and first and third events of odd partitions should be acked for i := 0; i < partitionCount; i++ { if i%2 == 0 { - result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{1, 2, 3}) require.Equal(t, len(result), 3) - require.Equal(t, result[0], false) - require.Equal(t, result[1], true) - require.Equal(t, result[2], false) - } else { - result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) - require.Equal(t, len(result), 3) - require.Equal(t, result[0], true) require.Equal(t, result[1], false) require.Equal(t, result[2], true) + require.Equal(t, result[3], false) + } else { + result := ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{1, 2, 3}) + require.Equal(t, len(result), 3) + require.Equal(t, result[1], true) + require.Equal(t, result[2], false) + require.Equal(t, result[3], true) } } wg.Add(100) @@ -231,7 +277,7 @@ func TestExtensionAckAsync(t *testing.T) { for i := 0; i < partitionCount; i++ { i := i go func() { - resultChan <- ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{0, 1, 2}) + resultChan <- ext.QueryAcks(fmt.Sprintf("part-%d", i), []uint64{1, 2, 3}) wg.Done() }() } @@ -240,8 +286,8 @@ func TestExtensionAckAsync(t *testing.T) { for i := 0; i < partitionCount; i++ { result := <-resultChan require.Equal(t, len(result), 3) - require.Equal(t, result[0], false) require.Equal(t, result[1], false) require.Equal(t, result[2], false) + require.Equal(t, result[3], false) } } From efb385fcd9a55c62a6b90c858bc69da1a011a15d Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Wed, 13 Mar 2024 13:50:52 -0700 Subject: [PATCH 13/15] fix go.mod --- extension/ackextension/go.mod | 1 - extension/ackextension/go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/extension/ackextension/go.mod b/extension/ackextension/go.mod index ade71388a21c..79a8401d781a 100644 --- a/extension/ackextension/go.mod +++ b/extension/ackextension/go.mod @@ -10,7 +10,6 @@ require ( go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/goleak v1.3.0 - golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 ) require ( diff --git a/extension/ackextension/go.sum b/extension/ackextension/go.sum index c5d05f384056..5d5c822d7e84 100644 --- a/extension/ackextension/go.sum +++ b/extension/ackextension/go.sum @@ -83,8 +83,6 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= -golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= From 30cbf56ba4dabcefef72aa30a60ea9f45c64eeff Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Thu, 14 Mar 2024 14:30:35 -0700 Subject: [PATCH 14/15] add default values --- extension/ackextension/config.go | 5 ----- extension/ackextension/factory.go | 9 ++++++++- extension/ackextension/factory_test.go | 2 ++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/extension/ackextension/config.go b/extension/ackextension/config.go index 5fd4208512e8..ce5e21eba0da 100644 --- a/extension/ackextension/config.go +++ b/extension/ackextension/config.go @@ -6,11 +6,6 @@ import ( "go.opentelemetry.io/collector/component" ) -const ( - defaultMaxNumPartition uint64 = 1_000_000 - defaultMaxNumPendingAcksPerPartition uint64 = 1_000_000 -) - // Config defines configuration for ack extension type Config struct { // StorageID defines the storage type of the extension. In-memory type is set by default (if not provided). Future consideration is disk type. diff --git a/extension/ackextension/factory.go b/extension/ackextension/factory.go index f734e5ac5919..553bbc063e19 100644 --- a/extension/ackextension/factory.go +++ b/extension/ackextension/factory.go @@ -14,6 +14,11 @@ import ( var defaultStorageType = (*component.ID)(nil) +const ( + defaultMaxNumPartition uint64 = 1_000_000 + defaultMaxNumPendingAcksPerPartition uint64 = 1_000_000 +) + // NewFactory creates a factory for ack extension. func NewFactory() extension.Factory { return extension.NewFactory( @@ -26,7 +31,9 @@ func NewFactory() extension.Factory { func createDefaultConfig() component.Config { return &Config{ - StorageID: defaultStorageType, + StorageID: defaultStorageType, + MaxNumPartition: defaultMaxNumPartition, + MaxNumPendingAcksPerPartition: defaultMaxNumPendingAcksPerPartition, } } diff --git a/extension/ackextension/factory_test.go b/extension/ackextension/factory_test.go index 0e9ec8a46155..c2c2401f3fbd 100644 --- a/extension/ackextension/factory_test.go +++ b/extension/ackextension/factory_test.go @@ -17,4 +17,6 @@ func TestFactory(t *testing.T) { cfg := f.CreateDefaultConfig().(*Config) require.Equal(t, defaultStorageType, cfg.StorageID) + require.Equal(t, defaultMaxNumPendingAcksPerPartition, cfg.MaxNumPendingAcksPerPartition) + require.Equal(t, defaultMaxNumPartition, cfg.MaxNumPartition) } From a53fe3435f71657c135d8009e299b3c5e89a94ec Mon Sep 17 00:00:00 2001 From: Peng Zhu Date: Thu, 21 Mar 2024 00:48:30 -0700 Subject: [PATCH 15/15] address PR comments --- extension/ackextension/ackextension.go | 2 +- extension/ackextension/config.go | 13 ------------ extension/ackextension/config_test.go | 29 -------------------------- extension/ackextension/factory.go | 6 +++++- extension/ackextension/inmemory.go | 23 ++++++++++---------- 5 files changed, 18 insertions(+), 55 deletions(-) delete mode 100644 extension/ackextension/config_test.go diff --git a/extension/ackextension/ackextension.go b/extension/ackextension/ackextension.go index 6e2563331b6c..c1fa30ca3fa8 100644 --- a/extension/ackextension/ackextension.go +++ b/extension/ackextension/ackextension.go @@ -14,6 +14,6 @@ type AckExtension interface { Ack(partitionID string, ackID uint64) // QueryAcks checks the statuses of given ackIDs for a partition. - // ackIDs that are not generated from ProcessEvent or have been removed as a result of previous calls to QueryAcks will return false + // ackIDs that are not generated from ProcessEvent or have been removed as a result of previous calls to QueryAcks will return false. QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool } diff --git a/extension/ackextension/config.go b/extension/ackextension/config.go index ce5e21eba0da..8c363cd5ab14 100644 --- a/extension/ackextension/config.go +++ b/extension/ackextension/config.go @@ -16,16 +16,3 @@ type Config struct { // MaxNumPendingAcksPerPartition Specifies the maximum number of ackIDs and their corresponding status information that are waiting to be queried in each partition. MaxNumPendingAcksPerPartition uint64 `mapstructure:"max_number_of_pending_acks_per_partition"` } - -// Validate checks that valid inputs are provided. Otherwise, assign default values. -func (cfg *Config) Validate() error { - if cfg.MaxNumPartition == 0 { - cfg.MaxNumPartition = defaultMaxNumPartition - } - - if cfg.MaxNumPendingAcksPerPartition == 0 { - cfg.MaxNumPendingAcksPerPartition = defaultMaxNumPendingAcksPerPartition - } - - return nil -} diff --git a/extension/ackextension/config_test.go b/extension/ackextension/config_test.go deleted file mode 100644 index 205b118bae5f..000000000000 --- a/extension/ackextension/config_test.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package ackextension - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestValidateConfigWithEmptyOrZeroValConfig(t *testing.T) { - cfg := Config{} - err := cfg.Validate() - require.Equal(t, err, nil) - require.Equal(t, cfg.MaxNumPendingAcksPerPartition, defaultMaxNumPendingAcksPerPartition) - require.Equal(t, cfg.MaxNumPartition, defaultMaxNumPartition) -} - -func TestValidateConfigWithValidConfig(t *testing.T) { - cfg := Config{ - MaxNumPendingAcksPerPartition: 3, - MaxNumPartition: 5, - } - err := cfg.Validate() - require.Equal(t, err, nil) - require.Equal(t, cfg.MaxNumPendingAcksPerPartition, uint64(3)) - require.Equal(t, cfg.MaxNumPartition, uint64(5)) -} diff --git a/extension/ackextension/factory.go b/extension/ackextension/factory.go index 553bbc063e19..048d754423d2 100644 --- a/extension/ackextension/factory.go +++ b/extension/ackextension/factory.go @@ -38,5 +38,9 @@ func createDefaultConfig() component.Config { } func createExtension(_ context.Context, _ extension.CreateSettings, cfg component.Config) (extension.Extension, error) { - return newInMemoryAckExtension(cfg.(*Config)), nil + if cfg.(*Config).StorageID == nil { + return newInMemoryAckExtension(cfg.(*Config)), nil + } + + return nil, nil } diff --git a/extension/ackextension/inmemory.go b/extension/ackextension/inmemory.go index 6a75d757adea..e9eb87e404aa 100644 --- a/extension/ackextension/inmemory.go +++ b/extension/ackextension/inmemory.go @@ -11,17 +11,17 @@ import ( "go.opentelemetry.io/collector/component" ) -// InMemoryAckExtension is the in-memory implementation of the AckExtension +// inMemoryAckExtension is the in-memory implementation of the AckExtension // When MaxNumPartition is reached, the acks associated with the least recently used partition are evicted. // When MaxNumPendingAcksPerPartition is reached, the least recently used ack is evicted -type InMemoryAckExtension struct { +type inMemoryAckExtension struct { partitionMap *lru.Cache[string, *ackPartition] maxNumPendingAcksPerPartition uint64 } -func newInMemoryAckExtension(conf *Config) *InMemoryAckExtension { +func newInMemoryAckExtension(conf *Config) *inMemoryAckExtension { cache, _ := lru.New[string, *ackPartition](int(conf.MaxNumPartition)) - return &InMemoryAckExtension{ + return &inMemoryAckExtension{ partitionMap: cache, maxNumPendingAcksPerPartition: conf.MaxNumPendingAcksPerPartition, } @@ -65,18 +65,18 @@ func (as *ackPartition) computeAcks(ackIDs []uint64) map[uint64]bool { return result } -// Start of InMemoryAckExtension does nothing and returns nil -func (i *InMemoryAckExtension) Start(_ context.Context, _ component.Host) error { +// Start of inMemoryAckExtension does nothing and returns nil +func (i *inMemoryAckExtension) Start(_ context.Context, _ component.Host) error { return nil } -// Shutdown of InMemoryAckExtension does nothing and returns nil -func (i *InMemoryAckExtension) Shutdown(_ context.Context) error { +// Shutdown of inMemoryAckExtension does nothing and returns nil +func (i *inMemoryAckExtension) Shutdown(_ context.Context) error { return nil } // ProcessEvent marks the beginning of processing an event. It generates an ack ID for the associated partition ID. -func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) { +func (i *inMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) { if val, ok := i.partitionMap.Get(partitionID); ok { return val.nextAck() } @@ -87,14 +87,15 @@ func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) { } // Ack acknowledges an event has been processed. -func (i *InMemoryAckExtension) Ack(partitionID string, ackID uint64) { +func (i *inMemoryAckExtension) Ack(partitionID string, ackID uint64) { if val, ok := i.partitionMap.Get(partitionID); ok { val.ack(ackID) } } // QueryAcks checks the statuses of given ackIDs for a partition. -func (i *InMemoryAckExtension) QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool { +// ackIDs that are not generated from ProcessEvent or have been removed as a result of previous calls to QueryAcks will return false. +func (i *inMemoryAckExtension) QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool { if val, ok := i.partitionMap.Get(partitionID); ok { return val.computeAcks(ackIDs) }