Skip to content

Commit

Permalink
Change the memory queue implementation to not pre-allocate capacity o…
Browse files Browse the repository at this point in the history
…bjects.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Jan 11, 2025
1 parent e6df9d8 commit 2144722
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 152 deletions.
25 changes: 25 additions & 0 deletions .chloggen/sized-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Change the memory queue implementation to not pre-allocate capacity objects.

# One or more tracking issues or pull requests related to the change
issues: [12070]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This change improves memory usage of the collector under low utilization and is a prerequisite for supporting different other size limitations (number of items, bytes).

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
35 changes: 5 additions & 30 deletions exporter/exporterqueue/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// the producer are dropped.
type boundedMemoryQueue[T any] struct {
component.StartFunc
*sizedChannel[memQueueEl[T]]
*sizedQueue[T]
}

// memoryQueueSettings defines internal parameters for boundedMemoryQueue creation.
Expand All @@ -29,40 +29,15 @@ type memoryQueueSettings[T any] struct {
// callback for dropped items (e.g. useful to emit metrics).
func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] {
return &boundedMemoryQueue[T]{
sizedChannel: newSizedChannel[memQueueEl[T]](set.capacity, memQueueElSizer[T]{sizer: set.sizer}),
sizedQueue: newSizedQueue[T](set.capacity, set.sizer),
}
}

// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req})
}

func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) {
item, ok := q.sizedChannel.pop()
return 0, item.ctx, item.req, ok
ctx, req, ok := q.sizedQueue.pop()
return 0, ctx, req, ok
}

// OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished.
// For in memory queue, this function is noop.
func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
q.sizedChannel.shutdown()
return nil
}

type memQueueEl[T any] struct {
req T
ctx context.Context
}

type memQueueElSizer[T any] struct {
sizer sizer[T]
}

func (mqes memQueueElSizer[T]) Sizeof(el memQueueEl[T]) int64 {
return mqes.sizer.Sizeof(el.req)
}
func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {}
71 changes: 0 additions & 71 deletions exporter/exporterqueue/sized_channel.go

This file was deleted.

51 changes: 0 additions & 51 deletions exporter/exporterqueue/sized_channel_test.go

This file was deleted.

138 changes: 138 additions & 0 deletions exporter/exporterqueue/sized_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue"

import (
"context"
"errors"
"sync"
)

var errInvalidSize = errors.New("invalid element size")

type node[T any] struct {
ctx context.Context
data T
size int64
next *node[T]
}

type linkedQueue[T any] struct {
head *node[T]
tail *node[T]
}

func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) {
n := &node[T]{ctx: ctx, data: data, size: size}
if l.tail == nil {
l.head = n
l.tail = n
return
}
l.tail.next = n
l.tail = n
}

func (l *linkedQueue[T]) pop() (context.Context, T, int64) {
n := l.head
l.head = n.next
if l.head == nil {
l.tail = nil
}
n.next = nil
return n.ctx, n.data, n.size
}

// sizedQueue is a channel wrapper for sized elements with a capacity set to a total size of all the elements.
// The channel will accept elements until the total size of the elements reaches the capacity.
type sizedQueue[T any] struct {
sizer sizer[T]
cap int64

mu sync.Mutex
hasElements *sync.Cond
items *linkedQueue[T]
size int64
stopped bool
}

// newSizedQueue creates a sized elements channel. Each element is assigned a size by the provided sizer.
// capacity is the capacity of the queue.
func newSizedQueue[T any](capacity int64, sizer sizer[T]) *sizedQueue[T] {
sq := &sizedQueue[T]{
sizer: sizer,
cap: capacity,
items: &linkedQueue[T]{},
}
sq.hasElements = sync.NewCond(&sq.mu)
return sq
}

// Offer puts the element into the queue with the given sized if there is enough capacity.
// Returns an error if the queue is full.
func (sq *sizedQueue[T]) Offer(ctx context.Context, el T) error {
elSize := sq.sizer.Sizeof(el)
if elSize == 0 {
return nil
}

if elSize <= 0 {
return errInvalidSize
}

sq.mu.Lock()
defer sq.mu.Unlock()

if sq.size+elSize > sq.cap {
return ErrQueueIsFull
}

sq.size += elSize
sq.items.push(ctx, el, elSize)
// Signal one consumer if any.
sq.hasElements.Signal()
return nil
}

// pop removes the element from the queue and returns it.
// The call blocks until there is an item available or the queue is stopped.
// The function returns true when an item is consumed or false if the queue is stopped and emptied.
func (sq *sizedQueue[T]) pop() (context.Context, T, bool) {
sq.mu.Lock()
defer sq.mu.Unlock()

for {
if sq.size > 0 {
ctx, el, elSize := sq.items.pop()
sq.size -= elSize
return ctx, el, true
}

if sq.stopped {
var el T
return context.Background(), el, false
}

sq.hasElements.Wait()
}
}

// Shutdown closes the queue channel to initiate draining of the queue.
func (sq *sizedQueue[T]) Shutdown(context.Context) error {
sq.mu.Lock()
defer sq.mu.Unlock()
sq.stopped = true
sq.hasElements.Broadcast()
return nil
}

func (sq *sizedQueue[T]) Size() int {
sq.mu.Lock()
defer sq.mu.Unlock()
return int(sq.size)
}

func (sq *sizedQueue[T]) Capacity() int {
return int(sq.cap)
}
Loading

0 comments on commit 2144722

Please sign in to comment.