Skip to content

Commit

Permalink
[extension/ackextension] Implement in-memory ack extension (#31651)
Browse files Browse the repository at this point in the history
**Description:** 
Adding the in-memory implementation of the Ack extension proposed in
#26376.

**Link to tracking Issue:** #26376
  • Loading branch information
zpzhuSplunk authored Mar 21, 2024
1 parent e752b99 commit 41a9700
Show file tree
Hide file tree
Showing 12 changed files with 475 additions and 6 deletions.
27 changes: 27 additions & 0 deletions .chloggen/ack_extension_impl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: ackextension

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: adding the in-memory implementation of the ackextension

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26376]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
5 changes: 3 additions & 2 deletions extension/ackextension/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ if ack fails.
```yaml
extensions:
ack:
storage:
storage:
max_number_of_partition: 1000000
max_number_of_pending_acks_per_partition: 1000000

receivers:
splunk_hec:
ack_extension: ack


service:
extensions: [ack]
pipelines:
Expand Down
3 changes: 2 additions & 1 deletion extension/ackextension/ackextension.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ package ackextension // import "github.com/open-telemetry/opentelemetry-collecto
// to check the status of given ack ids.
type AckExtension interface {
// ProcessEvent marks the beginning of processing an event. It generates an ack ID for the associated partition ID.
// ACK IDs are only unique within a partition. Two partitions can have the same ACK IDs but they are generated for different events.
// ACK IDs are only unique within a partition. Two partitions can have the same ACK IDs, but they are generated for different events.
ProcessEvent(partitionID string) (ackID uint64)

// Ack acknowledges an event has been processed.
Ack(partitionID string, ackID uint64)

// QueryAcks checks the statuses of given ackIDs for a partition.
// ackIDs that are not generated from ProcessEvent or have been removed as a result of previous calls to QueryAcks will return false.
QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool
}
5 changes: 5 additions & 0 deletions extension/ackextension/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ import (
type Config struct {
// StorageID defines the storage type of the extension. In-memory type is set by default (if not provided). Future consideration is disk type.
StorageID *component.ID `mapstructure:"storage"`
// MaxNumPartition Specifies the maximum number of partitions that clients can acquire for this extension instance.
// Implementation defines how limit exceeding should be handled.
MaxNumPartition uint64 `mapstructure:"max_number_of_partition"`
// MaxNumPendingAcksPerPartition Specifies the maximum number of ackIDs and their corresponding status information that are waiting to be queried in each partition.
MaxNumPendingAcksPerPartition uint64 `mapstructure:"max_number_of_pending_acks_per_partition"`
}
15 changes: 13 additions & 2 deletions extension/ackextension/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (

var defaultStorageType = (*component.ID)(nil)

const (
defaultMaxNumPartition uint64 = 1_000_000
defaultMaxNumPendingAcksPerPartition uint64 = 1_000_000
)

// NewFactory creates a factory for ack extension.
func NewFactory() extension.Factory {
return extension.NewFactory(
Expand All @@ -26,10 +31,16 @@ func NewFactory() extension.Factory {

func createDefaultConfig() component.Config {
return &Config{
StorageID: defaultStorageType,
StorageID: defaultStorageType,
MaxNumPartition: defaultMaxNumPartition,
MaxNumPendingAcksPerPartition: defaultMaxNumPendingAcksPerPartition,
}
}

func createExtension(_ context.Context, _ extension.CreateSettings, _ component.Config) (extension.Extension, error) {
func createExtension(_ context.Context, _ extension.CreateSettings, cfg component.Config) (extension.Extension, error) {
if cfg.(*Config).StorageID == nil {
return newInMemoryAckExtension(cfg.(*Config)), nil
}

return nil, nil
}
2 changes: 2 additions & 0 deletions extension/ackextension/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ func TestFactory(t *testing.T) {

cfg := f.CreateDefaultConfig().(*Config)
require.Equal(t, defaultStorageType, cfg.StorageID)
require.Equal(t, defaultMaxNumPendingAcksPerPartition, cfg.MaxNumPendingAcksPerPartition)
require.Equal(t, defaultMaxNumPartition, cfg.MaxNumPartition)
}
2 changes: 2 additions & 0 deletions extension/ackextension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackex
go 1.21

require (
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.96.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/collector/extension v0.96.1-0.20240315172937-3b5aee0c7a16
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
go.uber.org/goleak v1.3.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions extension/ackextension/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

109 changes: 109 additions & 0 deletions extension/ackextension/inmemory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ackextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension"

import (
"context"
"sync/atomic"

lru "github.com/hashicorp/golang-lru/v2"
"go.opentelemetry.io/collector/component"
)

// inMemoryAckExtension is the in-memory implementation of the AckExtension
// When MaxNumPartition is reached, the acks associated with the least recently used partition are evicted.
// When MaxNumPendingAcksPerPartition is reached, the least recently used ack is evicted
type inMemoryAckExtension struct {
partitionMap *lru.Cache[string, *ackPartition]
maxNumPendingAcksPerPartition uint64
}

func newInMemoryAckExtension(conf *Config) *inMemoryAckExtension {
cache, _ := lru.New[string, *ackPartition](int(conf.MaxNumPartition))
return &inMemoryAckExtension{
partitionMap: cache,
maxNumPendingAcksPerPartition: conf.MaxNumPendingAcksPerPartition,
}
}

type ackPartition struct {
id atomic.Uint64
ackMap *lru.Cache[uint64, bool]
}

func newAckPartition(maxPendingAcks uint64) *ackPartition {
cache, _ := lru.New[uint64, bool](int(maxPendingAcks))
return &ackPartition{
ackMap: cache,
}
}

func (as *ackPartition) nextAck() uint64 {
id := as.id.Add(1)
as.ackMap.Add(id, false)
return id
}

func (as *ackPartition) ack(key uint64) {
if _, ok := as.ackMap.Get(key); ok {
as.ackMap.Add(key, true)
}
}

func (as *ackPartition) computeAcks(ackIDs []uint64) map[uint64]bool {
result := make(map[uint64]bool, len(ackIDs))
for _, val := range ackIDs {
if isAcked, ok := as.ackMap.Get(val); ok && isAcked {
result[val] = true
as.ackMap.Remove(val)
} else {
result[val] = false
}
}

return result
}

// Start of inMemoryAckExtension does nothing and returns nil
func (i *inMemoryAckExtension) Start(_ context.Context, _ component.Host) error {
return nil
}

// Shutdown of inMemoryAckExtension does nothing and returns nil
func (i *inMemoryAckExtension) Shutdown(_ context.Context) error {
return nil
}

// ProcessEvent marks the beginning of processing an event. It generates an ack ID for the associated partition ID.
func (i *inMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) {
if val, ok := i.partitionMap.Get(partitionID); ok {
return val.nextAck()
}

i.partitionMap.ContainsOrAdd(partitionID, newAckPartition(i.maxNumPendingAcksPerPartition))
val, _ := i.partitionMap.Get(partitionID)
return val.nextAck()
}

// Ack acknowledges an event has been processed.
func (i *inMemoryAckExtension) Ack(partitionID string, ackID uint64) {
if val, ok := i.partitionMap.Get(partitionID); ok {
val.ack(ackID)
}
}

// QueryAcks checks the statuses of given ackIDs for a partition.
// ackIDs that are not generated from ProcessEvent or have been removed as a result of previous calls to QueryAcks will return false.
func (i *inMemoryAckExtension) QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool {
if val, ok := i.partitionMap.Get(partitionID); ok {
return val.computeAcks(ackIDs)
}

result := make(map[uint64]bool, len(ackIDs))
for _, ackID := range ackIDs {
result[ackID] = false
}

return result
}
Loading

0 comments on commit 41a9700

Please sign in to comment.