Skip to content

Commit

Permalink
fix check stage fail
Browse files Browse the repository at this point in the history
  • Loading branch information
zpzhuSplunk committed Mar 11, 2024
1 parent 808f9a0 commit d2c2b1b
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
2 changes: 1 addition & 1 deletion extension/ackextension/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@ func createDefaultConfig() component.Config {
}

func createExtension(_ context.Context, _ extension.CreateSettings, _ component.Config) (extension.Extension, error) {
return nil, nil
return newInMemoryAckExtension(), nil
}
18 changes: 17 additions & 1 deletion extension/ackextension/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
package ackextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension"

import (
"context"
"sync"
"sync/atomic"

"go.opentelemetry.io/collector/component"
)

type InMemoryAckExtension struct {
partitionMap sync.Map
}

func NewInMemoryAckExtension() AckExtension {
func newInMemoryAckExtension() *InMemoryAckExtension {
return &InMemoryAckExtension{}
}

Expand Down Expand Up @@ -54,6 +57,17 @@ func (as *ackStatus) computeAcks(ackIDs []uint64) map[uint64]bool {
return result
}

// Start of InMemoryAckExtension does nothing and returns nil
func (i *InMemoryAckExtension) Start(_ context.Context, _ component.Host) error {
return nil
}

// Shutdown of InMemoryAckExtension does nothing and returns nil
func (i *InMemoryAckExtension) Shutdown(_ context.Context) error {
return nil
}

// ProcessEvent marks the beginning of processing an event. It generates an ack ID for the associated partition ID.
func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) {
if actual, loaded := i.partitionMap.LoadOrStore(partitionID, newAckStatus()); loaded {
status := actual.(*ackStatus)
Expand All @@ -63,6 +77,7 @@ func (i *InMemoryAckExtension) ProcessEvent(partitionID string) (ackID uint64) {
return 0
}

// Ack acknowledges an event has been processed.
func (i *InMemoryAckExtension) Ack(partitionID string, ackID uint64) {
if val, ok := i.partitionMap.Load(partitionID); ok {
if status, ok := val.(*ackStatus); ok {
Expand All @@ -71,6 +86,7 @@ func (i *InMemoryAckExtension) Ack(partitionID string, ackID uint64) {
}
}

// QueryAcks checks the statuses of given ackIDs for a partition.
func (i *InMemoryAckExtension) QueryAcks(partitionID string, ackIDs []uint64) map[uint64]bool {
if val, ok := i.partitionMap.Load(partitionID); ok {
if status, ok := val.(*ackStatus); ok {
Expand Down
8 changes: 3 additions & 5 deletions extension/ackextension/inmemory_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package ackextension_test
package ackextension

import (
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension"
)

func TestExtensionAck(t *testing.T) {
ext := ackextension.NewInMemoryAckExtension()
ext := newInMemoryAckExtension()

// send events through different partitions
for i := 0; i < 100; i++ {
Expand Down Expand Up @@ -71,7 +69,7 @@ func TestExtensionAck(t *testing.T) {
}

func TestExtensionAckAsync(t *testing.T) {
ext := ackextension.NewInMemoryAckExtension()
ext := newInMemoryAckExtension()
partitionCount := 100
var wg sync.WaitGroup
wg.Add(partitionCount)
Expand Down

0 comments on commit d2c2b1b

Please sign in to comment.