diff --git a/.chloggen/sized-queue.yaml b/.chloggen/sized-queue.yaml new file mode 100644 index 00000000000..5b9bb3528f3 --- /dev/null +++ b/.chloggen/sized-queue.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Change the memory queue implementation to not pre-allocate capacity objects. + +# One or more tracking issues or pull requests related to the change +issues: [12070] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This change improves memory usage of the collector under low utilization and is a prerequisite for supporting different other size limitations (number of items, bytes). + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterqueue/bounded_memory_queue.go b/exporter/exporterqueue/bounded_memory_queue.go index 7cb48ec77c4..7543958b2a1 100644 --- a/exporter/exporterqueue/bounded_memory_queue.go +++ b/exporter/exporterqueue/bounded_memory_queue.go @@ -16,7 +16,7 @@ import ( // the producer are dropped. type boundedMemoryQueue[T any] struct { component.StartFunc - *sizedChannel[memQueueEl[T]] + *sizedQueue[T] } // memoryQueueSettings defines internal parameters for boundedMemoryQueue creation. @@ -29,40 +29,15 @@ type memoryQueueSettings[T any] struct { // callback for dropped items (e.g. useful to emit metrics). func newBoundedMemoryQueue[T any](set memoryQueueSettings[T]) Queue[T] { return &boundedMemoryQueue[T]{ - sizedChannel: newSizedChannel[memQueueEl[T]](set.capacity, memQueueElSizer[T]{sizer: set.sizer}), + sizedQueue: newSizedQueue[T](set.capacity, set.sizer), } } -// Offer is used by the producer to submit new item to the queue. Calling this method on a stopped queue will panic. -func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error { - return q.sizedChannel.push(memQueueEl[T]{ctx: ctx, req: req}) -} - func (q *boundedMemoryQueue[T]) Read(_ context.Context) (uint64, context.Context, T, bool) { - item, ok := q.sizedChannel.pop() - return 0, item.ctx, item.req, ok + ctx, req, ok := q.sizedQueue.pop() + return 0, ctx, req, ok } // OnProcessingFinished should be called to remove the item of the given index from the queue once processing is finished. // For in memory queue, this function is noop. -func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) { -} - -// Shutdown closes the queue channel to initiate draining of the queue. -func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { - q.sizedChannel.shutdown() - return nil -} - -type memQueueEl[T any] struct { - req T - ctx context.Context -} - -type memQueueElSizer[T any] struct { - sizer sizer[T] -} - -func (mqes memQueueElSizer[T]) Sizeof(el memQueueEl[T]) int64 { - return mqes.sizer.Sizeof(el.req) -} +func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) {} diff --git a/exporter/exporterqueue/sized_channel.go b/exporter/exporterqueue/sized_channel.go deleted file mode 100644 index f27df7e295e..00000000000 --- a/exporter/exporterqueue/sized_channel.go +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" - -import ( - "errors" - "sync/atomic" -) - -var errInvalidSize = errors.New("invalid element size") - -// sizedChannel is a channel wrapper for sized elements with a capacity set to a total size of all the elements. -// The channel will accept elements until the total size of the elements reaches the capacity. -type sizedChannel[T any] struct { - sizer sizer[T] - used *atomic.Int64 - ch chan T -} - -// newSizedChannel creates a sized elements channel. Each element is assigned a positive size by the provided sizer. -// capacity is the total capacity of the queue. -func newSizedChannel[T any](capacity int64, sizer sizer[T]) *sizedChannel[T] { - return &sizedChannel[T]{ - sizer: sizer, - used: &atomic.Int64{}, - ch: make(chan T, capacity), - } -} - -// push puts the element into the queue with the given sized if there is enough capacity. -// Returns an error if the queue is full. -func (vcq *sizedChannel[T]) push(el T) error { - elSize := vcq.sizer.Sizeof(el) - if elSize <= 0 { - return errInvalidSize - } - if vcq.used.Add(elSize) > int64(cap(vcq.ch)) { - vcq.used.Add(-elSize) - return ErrQueueIsFull - } - - vcq.ch <- el - return nil -} - -// pop removes the element from the queue and returns it. -// The call blocks until there is an item available or the queue is stopped. -// The function returns true when an item is consumed or false if the queue is stopped and emptied. -func (vcq *sizedChannel[T]) pop() (T, bool) { - el, ok := <-vcq.ch - if !ok { - return el, false - } - - vcq.used.Add(-vcq.sizer.Sizeof(el)) - return el, true -} - -// shutdown closes the queue channel to initiate draining of the queue. -func (vcq *sizedChannel[T]) shutdown() { - close(vcq.ch) -} - -func (vcq *sizedChannel[T]) Size() int { - return int(vcq.used.Load()) -} - -func (vcq *sizedChannel[T]) Capacity() int { - return cap(vcq.ch) -} diff --git a/exporter/exporterqueue/sized_channel_test.go b/exporter/exporterqueue/sized_channel_test.go deleted file mode 100644 index 0ad73975b99..00000000000 --- a/exporter/exporterqueue/sized_channel_test.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exporterqueue - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -type sizerInt struct{} - -func (s sizerInt) Sizeof(el int) int64 { - return int64(el) -} - -func TestSizedChannel(t *testing.T) { - q := newSizedChannel[int](7, sizerInt{}) - require.NoError(t, q.push(1)) - assert.Equal(t, 1, q.Size()) - assert.Equal(t, 7, q.Capacity()) - - require.NoError(t, q.push(3)) - assert.Equal(t, 4, q.Size()) - - // should not be able to send to the full queue - require.Error(t, q.push(4)) - assert.Equal(t, 4, q.Size()) - - el, ok := q.pop() - assert.Equal(t, 1, el) - assert.True(t, ok) - assert.Equal(t, 3, q.Size()) - - el, ok = q.pop() - assert.Equal(t, 3, el) - assert.True(t, ok) - assert.Equal(t, 0, q.Size()) - - q.shutdown() - el, ok = q.pop() - assert.False(t, ok) - assert.Equal(t, 0, el) -} - -func TestSizedChannel_OfferInvalidSize(t *testing.T) { - q := newSizedChannel[int](1, sizerInt{}) - require.ErrorIs(t, q.push(0), errInvalidSize) -} diff --git a/exporter/exporterqueue/sized_queue.go b/exporter/exporterqueue/sized_queue.go new file mode 100644 index 00000000000..a9b58241c28 --- /dev/null +++ b/exporter/exporterqueue/sized_queue.go @@ -0,0 +1,138 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" + +import ( + "context" + "errors" + "sync" +) + +var errInvalidSize = errors.New("invalid element size") + +type node[T any] struct { + ctx context.Context + data T + size int64 + next *node[T] +} + +type linkedQueue[T any] struct { + head *node[T] + tail *node[T] +} + +func (l *linkedQueue[T]) push(ctx context.Context, data T, size int64) { + n := &node[T]{ctx: ctx, data: data, size: size} + if l.tail == nil { + l.head = n + l.tail = n + return + } + l.tail.next = n + l.tail = n +} + +func (l *linkedQueue[T]) pop() (context.Context, T, int64) { + n := l.head + l.head = n.next + if l.head == nil { + l.tail = nil + } + n.next = nil + return n.ctx, n.data, n.size +} + +// sizedQueue is a channel wrapper for sized elements with a capacity set to a total size of all the elements. +// The channel will accept elements until the total size of the elements reaches the capacity. +type sizedQueue[T any] struct { + sizer sizer[T] + cap int64 + + mu sync.Mutex + hasElements *sync.Cond + items *linkedQueue[T] + size int64 + stopped bool +} + +// newSizedQueue creates a sized elements channel. Each element is assigned a size by the provided sizer. +// capacity is the capacity of the queue. +func newSizedQueue[T any](capacity int64, sizer sizer[T]) *sizedQueue[T] { + sq := &sizedQueue[T]{ + sizer: sizer, + cap: capacity, + items: &linkedQueue[T]{}, + } + sq.hasElements = sync.NewCond(&sq.mu) + return sq +} + +// Offer puts the element into the queue with the given sized if there is enough capacity. +// Returns an error if the queue is full. +func (sq *sizedQueue[T]) Offer(ctx context.Context, el T) error { + elSize := sq.sizer.Sizeof(el) + if elSize == 0 { + return nil + } + + if elSize <= 0 { + return errInvalidSize + } + + sq.mu.Lock() + defer sq.mu.Unlock() + + if sq.size+elSize > sq.cap { + return ErrQueueIsFull + } + + sq.size += elSize + sq.items.push(ctx, el, elSize) + // Signal one consumer if any. + sq.hasElements.Signal() + return nil +} + +// pop removes the element from the queue and returns it. +// The call blocks until there is an item available or the queue is stopped. +// The function returns true when an item is consumed or false if the queue is stopped and emptied. +func (sq *sizedQueue[T]) pop() (context.Context, T, bool) { + sq.mu.Lock() + defer sq.mu.Unlock() + + for { + if sq.size > 0 { + ctx, el, elSize := sq.items.pop() + sq.size -= elSize + return ctx, el, true + } + + if sq.stopped { + var el T + return context.Background(), el, false + } + + sq.hasElements.Wait() + } +} + +// Shutdown closes the queue channel to initiate draining of the queue. +func (sq *sizedQueue[T]) Shutdown(context.Context) error { + sq.mu.Lock() + defer sq.mu.Unlock() + sq.stopped = true + sq.hasElements.Broadcast() + return nil +} + +func (sq *sizedQueue[T]) Size() int { + sq.mu.Lock() + defer sq.mu.Unlock() + return int(sq.size) +} + +func (sq *sizedQueue[T]) Capacity() int { + return int(sq.cap) +} diff --git a/exporter/exporterqueue/sized_queue_test.go b/exporter/exporterqueue/sized_queue_test.go new file mode 100644 index 00000000000..4fa5e81dee8 --- /dev/null +++ b/exporter/exporterqueue/sized_queue_test.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type sizerInt struct{} + +func (s sizerInt) Sizeof(el int) int64 { + return int64(el) +} + +func TestSizedQueue(t *testing.T) { + q := newSizedQueue[int](7, sizerInt{}) + require.NoError(t, q.Offer(context.Background(), 1)) + assert.Equal(t, 1, q.Size()) + assert.Equal(t, 7, q.Capacity()) + + require.NoError(t, q.Offer(context.Background(), 3)) + assert.Equal(t, 4, q.Size()) + + // should not be able to send to the full queue + require.Error(t, q.Offer(context.Background(), 4)) + assert.Equal(t, 4, q.Size()) + + _, el, ok := q.pop() + assert.Equal(t, 1, el) + assert.True(t, ok) + assert.Equal(t, 3, q.Size()) + + _, el, ok = q.pop() + assert.Equal(t, 3, el) + assert.True(t, ok) + assert.Equal(t, 0, q.Size()) + + require.NoError(t, q.Shutdown(context.Background())) + _, el, ok = q.pop() + assert.False(t, ok) + assert.Equal(t, 0, el) +} + +func TestSizedQueue_DrainAllElements(t *testing.T) { + q := newSizedQueue[int](7, sizerInt{}) + require.NoError(t, q.Offer(context.Background(), 1)) + require.NoError(t, q.Offer(context.Background(), 3)) + + _, el, ok := q.pop() + assert.Equal(t, 1, el) + assert.True(t, ok) + assert.Equal(t, 3, q.Size()) + + require.NoError(t, q.Shutdown(context.Background())) + _, el, ok = q.pop() + assert.Equal(t, 3, el) + assert.True(t, ok) + assert.Equal(t, 0, q.Size()) + + _, el, ok = q.pop() + assert.False(t, ok) + assert.Equal(t, 0, el) +} + +func TestSizedChannel_OfferInvalidSize(t *testing.T) { + q := newSizedQueue[int](1, sizerInt{}) + require.ErrorIs(t, q.Offer(context.Background(), -1), errInvalidSize) +} + +func TestSizedChannel_OfferZeroSize(t *testing.T) { + q := newSizedQueue[int](1, sizerInt{}) + require.NoError(t, q.Offer(context.Background(), 0)) + require.NoError(t, q.Shutdown(context.Background())) + // Because the size 0 is ignored, nothing to drain. + _, el, ok := q.pop() + assert.False(t, ok) + assert.Equal(t, 0, el) +}