Skip to content

Commit

Permalink
[pkg/stanza] handle errors from Operator.Process (#33847)
Browse files Browse the repository at this point in the history
**Link to tracking Issue:** Fixes
#33783

**Testing:** 

nothing added

**Documentation:**

not applicable

---------

Co-authored-by: Dan Jaglowski <[email protected]>
  • Loading branch information
zeitlinger and djaglowski authored Jul 9, 2024
1 parent 562b7b6 commit d31bc2e
Show file tree
Hide file tree
Showing 20 changed files with 103 additions and 43 deletions.
27 changes: 27 additions & 0 deletions .chloggen/handle-process-errors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "errors from Operator.Process are returned instead of silently ignored."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33783]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: "This public function is affected: https://pkg.go.dev/github.com/open-telemetry/opentelemetry-collector-contrib/pkg/[email protected]/operator/helper#WriterOperator.Write"

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
6 changes: 5 additions & 1 deletion pkg/stanza/adapter/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -197,7 +198,10 @@ func (b *Input) Start(_ operator.Persister) error {
return
default:
}
b.Write(ctx, b.entries[n])
err := b.Write(ctx, b.entries[n])
if err != nil {
b.Logger().Error("failed to write entry", zap.Error(err))
}
}
}()
return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/stanza/fileconsumer/internal/header/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline"
Expand Down Expand Up @@ -54,7 +55,8 @@ func (r *Reader) Process(ctx context.Context, token []byte, fileAttributes map[s
newEntry.Body = string(token)

if err := firstOperator.Process(ctx, newEntry); err != nil {
return fmt.Errorf("process header entry: %w", err)
r.set.Logger.Error("process header entry", zap.Error(err))
// Do not return yet. An entry was added to the logsChan which must be consumed generically.
}

ent, err := r.output.WaitForEntry(ctx)
Expand Down
6 changes: 2 additions & 4 deletions pkg/stanza/operator/helper/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.E
return p.HandleEntryError(ctx, entry, err)
}
if skip {
p.Write(ctx, entry)
return nil
return p.Write(ctx, entry)
}

if err = p.ParseWith(ctx, entry, parse); err != nil {
Expand All @@ -119,8 +118,7 @@ func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.E
}
}

p.Write(ctx, entry)
return nil
return p.Write(ctx, entry)
}

// ParseWith will process an entry's field with a parser function.
Expand Down
11 changes: 6 additions & 5 deletions pkg/stanza/operator/helper/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,13 @@ func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entr
return t.HandleEntryError(ctx, entry, err)
}
if skip {
t.Write(ctx, entry)
return nil
return t.Write(ctx, entry)
}

if err := transform(entry); err != nil {
return t.HandleEntryError(ctx, entry, err)
}
t.Write(ctx, entry)
return nil
return t.Write(ctx, entry)
}

// HandleEntryError will handle an entry error using the on_error strategy.
Expand All @@ -102,7 +100,10 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry
t.Logger().Error("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError))
}
if t.OnError == SendOnError || t.OnError == SendOnErrorQuiet {
t.Write(ctx, entry)
writeErr := t.Write(ctx, entry)
if writeErr != nil {
return fmt.Errorf("failed to send entry after error: %w", writeErr)
}
}
return err
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/stanza/operator/helper/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,17 @@ type WriterOperator struct {
}

// Write will write an entry to the outputs of the operator.
func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) {
for i, operator := range w.OutputOperators {
func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) error {
for i, op := range w.OutputOperators {
if i == len(w.OutputOperators)-1 {
_ = operator.Process(ctx, e)
return
return op.Process(ctx, e)
}
err := op.Process(ctx, e.Copy())
if err != nil {
return err
}
_ = operator.Process(ctx, e.Copy())
}
return nil
}

// CanOutput always returns true for a writer operator.
Expand Down
6 changes: 4 additions & 2 deletions pkg/stanza/operator/helper/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func TestWriterOperatorWrite(t *testing.T) {
ctx := context.Background()
testEntry := entry.New()

writer.Write(ctx, testEntry)
err := writer.Write(ctx, testEntry)
require.NoError(t, err)
output1.AssertCalled(t, "Process", ctx, mock.Anything)
output2.AssertCalled(t, "Process", ctx, mock.Anything)
}
Expand All @@ -75,7 +76,8 @@ func TestWriterOperatorOutputs(t *testing.T) {
ctx := context.Background()
testEntry := entry.New()

writer.Write(ctx, testEntry)
err := writer.Write(ctx, testEntry)
require.NoError(t, err)
output1.AssertCalled(t, "Process", ctx, mock.Anything)
output2.AssertCalled(t, "Process", ctx, mock.Anything)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/stanza/operator/input/file/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,5 @@ func (i *Input) emit(ctx context.Context, token []byte, attrs map[string]any) er
i.Logger().Error("set attribute", zap.Error(err))
}
}
i.Write(ctx, ent)
return nil
return i.Write(ctx, ent)
}
8 changes: 7 additions & 1 deletion pkg/stanza/operator/input/generate/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -44,7 +46,11 @@ func (i *Input) Start(_ operator.Persister) error {
if !i.static {
entry.Timestamp = time.Now()
}
i.Write(ctx, entry)
err := i.Write(ctx, entry)
if err != nil {
i.Logger().Error("failed to write entry", zap.Error(err))
return
}

n++
if n == i.count {
Expand Down
6 changes: 4 additions & 2 deletions pkg/stanza/operator/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,12 @@ func (operator *Input) Start(persister operator.Persister) error {
operator.Logger().Warn("Failed to parse journal entry", zap.Error(err))
continue
}
if err := operator.persister.Set(ctx, lastReadCursorKey, []byte(cursor)); err != nil {
if err = operator.persister.Set(ctx, lastReadCursorKey, []byte(cursor)); err != nil {
operator.Logger().Warn("Failed to set offset", zap.Error(err))
}
operator.Write(ctx, entry)
if err = operator.Write(ctx, entry); err != nil {
operator.Logger().Error("failed to write entry", zap.Error(err))
}
}
}()

Expand Down
3 changes: 1 addition & 2 deletions pkg/stanza/operator/input/namedpipe/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,5 @@ func (i *Input) sendEntry(ctx context.Context, bytes []byte) error {
return fmt.Errorf("failed to create entry: %w", err)
}

i.Write(ctx, entry)
return nil
return i.Write(ctx, entry)
}
8 changes: 6 additions & 2 deletions pkg/stanza/operator/input/stdin/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (i *Input) Start(_ operator.Persister) error {
}

if ok := scanner.Scan(); !ok {
if err := scanner.Err(); err != nil {
if err = scanner.Err(); err != nil {
i.Logger().Error("Scanning failed", zap.Error(err))
}
i.Logger().Info("Stdin has been closed")
Expand All @@ -62,7 +62,11 @@ func (i *Input) Start(_ operator.Persister) error {

e := entry.New()
e.Body = scanner.Text()
i.Write(ctx, e)
err = i.Write(ctx, e)
if err != nil {
i.Logger().Error("failed to write entry", zap.Error(err))
return
}
}
}()

Expand Down
5 changes: 4 additions & 1 deletion pkg/stanza/operator/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ func (i *Input) handleMessage(ctx context.Context, conn net.Conn, dec *decode.De
}
}

i.Write(ctx, entry)
err = i.Write(ctx, entry)
if err != nil {
i.Logger().Error("Failed to write entry", zap.Error(err))
}
}

func truncateMaxLog(data []byte, maxLogSize int) (token []byte) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/stanza/operator/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ func (i *Input) handleMessage(ctx context.Context, remoteAddr net.Addr, dec *dec
}
}

i.Write(ctx, entry)
err = i.Write(ctx, entry)
if err != nil {
i.Logger().Error("Failed to write entry", zap.Error(err))
}
}

// readMessage will read log messages from the connection.
Expand Down
9 changes: 6 additions & 3 deletions pkg/stanza/operator/parser/container/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) {
return p.HandleEntryError(ctx, entry, err)
}
if skip {
p.Write(ctx, entry)
return nil
return p.Write(ctx, entry)
}

if format == containerdFormat {
Expand Down Expand Up @@ -160,7 +159,11 @@ func (p *Parser) crioConsumer(ctx context.Context) {
defer p.criConsumers.Done()
for entries := range entriesChan {
for _, e := range entries {
p.Write(ctx, e)
err := p.Write(ctx, e)
if err != nil {
p.Logger().Error("failed to write entry", zap.Error(err))
return
}
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/stanza/operator/transformer/filter/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error {
}

if !filtered {
t.Write(ctx, entry)
return nil
return t.Write(ctx, entry)
}

i, err := randInt(rand.Reader, upperBound)
Expand All @@ -50,7 +49,10 @@ func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error {
}

if i.Cmp(t.dropCutoff) >= 0 {
t.Write(ctx, entry)
err := t.Write(ctx, entry)
if err != nil {
return err
}
}

return nil
Expand Down
3 changes: 1 addition & 2 deletions pkg/stanza/operator/transformer/noop/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@ type Transformer struct {

// Process will forward the entry to the next output without any alterations.
func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error {
t.Write(ctx, entry)
return nil
return t.Write(ctx, entry)
}
4 changes: 2 additions & 2 deletions pkg/stanza/operator/transformer/recombine/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,9 @@ func (t *Transformer) flushSource(ctx context.Context, source string) error {
return err
}

t.Write(ctx, batch.baseEntry)
err = t.Write(ctx, batch.baseEntry)
t.removeBatch(source)
return nil
return err
}

// addNewBatch creates a new batch for the given source and adds the entry to it.
Expand Down
8 changes: 6 additions & 2 deletions pkg/stanza/operator/transformer/router/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@ func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error {

// we compile the expression with "AsBool", so this should be safe
if matches.(bool) {
if err := route.Attribute(entry); err != nil {
if err = route.Attribute(entry); err != nil {
t.Logger().Error("Failed to label entry", zap.Error(err))
return err
}

for _, output := range route.OutputOperators {
_ = output.Process(ctx, entry)
err = output.Process(ctx, entry)
if err != nil {
t.Logger().Error("Failed to process entry", zap.Error(err))
return err
}
}
break
}
Expand Down
3 changes: 1 addition & 2 deletions processor/logstransformprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ func (t *laggyOperator) Process(ctx context.Context, e *entry.Entry) error {

t.logsCount++

t.Write(ctx, e)
return nil
return t.Write(ctx, e)
}

func (t *laggyOperator) CanProcess() bool {
Expand Down

0 comments on commit d31bc2e

Please sign in to comment.