diff --git a/.chloggen/handle-process-errors.yaml b/.chloggen/handle-process-errors.yaml new file mode 100644 index 000000000000..fe3985694ccc --- /dev/null +++ b/.chloggen/handle-process-errors.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "errors from Operator.Process are returned instead of silently ignored." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33783] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: "This public function is affected: https://pkg.go.dev/github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza@v0.104.0/operator/helper#WriterOperator.Write" + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/stanza/adapter/benchmark_test.go b/pkg/stanza/adapter/benchmark_test.go index 1a31389426e2..b2ed4be6554b 100644 --- a/pkg/stanza/adapter/benchmark_test.go +++ b/pkg/stanza/adapter/benchmark_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -197,7 +198,10 @@ func (b *Input) Start(_ operator.Persister) error { return default: } - b.Write(ctx, b.entries[n]) + err := b.Write(ctx, b.entries[n]) + if err != nil { + b.Logger().Error("failed to write entry", zap.Error(err)) + } } }() return nil diff --git a/pkg/stanza/fileconsumer/internal/header/reader.go b/pkg/stanza/fileconsumer/internal/header/reader.go index 27a81c338bdb..5103313e9621 100644 --- a/pkg/stanza/fileconsumer/internal/header/reader.go +++ b/pkg/stanza/fileconsumer/internal/header/reader.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/experimental/storage" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/pipeline" @@ -54,7 +55,8 @@ func (r *Reader) Process(ctx context.Context, token []byte, fileAttributes map[s newEntry.Body = string(token) if err := firstOperator.Process(ctx, newEntry); err != nil { - return fmt.Errorf("process header entry: %w", err) + r.set.Logger.Error("process header entry", zap.Error(err)) + // Do not return yet. An entry was added to the logsChan which must be consumed generically. } ent, err := r.output.WaitForEntry(ctx) diff --git a/pkg/stanza/operator/helper/parser.go b/pkg/stanza/operator/helper/parser.go index 84ba71035b8e..56b2eaccd106 100644 --- a/pkg/stanza/operator/helper/parser.go +++ b/pkg/stanza/operator/helper/parser.go @@ -105,8 +105,7 @@ func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.E return p.HandleEntryError(ctx, entry, err) } if skip { - p.Write(ctx, entry) - return nil + return p.Write(ctx, entry) } if err = p.ParseWith(ctx, entry, parse); err != nil { @@ -119,8 +118,7 @@ func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.E } } - p.Write(ctx, entry) - return nil + return p.Write(ctx, entry) } // ParseWith will process an entry's field with a parser function. diff --git a/pkg/stanza/operator/helper/transformer.go b/pkg/stanza/operator/helper/transformer.go index a62eba5ca83f..65f7a71ddf40 100644 --- a/pkg/stanza/operator/helper/transformer.go +++ b/pkg/stanza/operator/helper/transformer.go @@ -83,15 +83,13 @@ func (t *TransformerOperator) ProcessWith(ctx context.Context, entry *entry.Entr return t.HandleEntryError(ctx, entry, err) } if skip { - t.Write(ctx, entry) - return nil + return t.Write(ctx, entry) } if err := transform(entry); err != nil { return t.HandleEntryError(ctx, entry, err) } - t.Write(ctx, entry) - return nil + return t.Write(ctx, entry) } // HandleEntryError will handle an entry error using the on_error strategy. @@ -102,7 +100,10 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry t.Logger().Error("Failed to process entry", zap.Any("error", err), zap.Any("action", t.OnError)) } if t.OnError == SendOnError || t.OnError == SendOnErrorQuiet { - t.Write(ctx, entry) + writeErr := t.Write(ctx, entry) + if writeErr != nil { + return fmt.Errorf("failed to send entry after error: %w", writeErr) + } } return err } diff --git a/pkg/stanza/operator/helper/writer.go b/pkg/stanza/operator/helper/writer.go index 89666ae88c91..c34b859a3cbf 100644 --- a/pkg/stanza/operator/helper/writer.go +++ b/pkg/stanza/operator/helper/writer.go @@ -47,14 +47,17 @@ type WriterOperator struct { } // Write will write an entry to the outputs of the operator. -func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) { - for i, operator := range w.OutputOperators { +func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) error { + for i, op := range w.OutputOperators { if i == len(w.OutputOperators)-1 { - _ = operator.Process(ctx, e) - return + return op.Process(ctx, e) + } + err := op.Process(ctx, e.Copy()) + if err != nil { + return err } - _ = operator.Process(ctx, e.Copy()) } + return nil } // CanOutput always returns true for a writer operator. diff --git a/pkg/stanza/operator/helper/writer_test.go b/pkg/stanza/operator/helper/writer_test.go index 271f01e1032d..cec2bdba5768 100644 --- a/pkg/stanza/operator/helper/writer_test.go +++ b/pkg/stanza/operator/helper/writer_test.go @@ -53,7 +53,8 @@ func TestWriterOperatorWrite(t *testing.T) { ctx := context.Background() testEntry := entry.New() - writer.Write(ctx, testEntry) + err := writer.Write(ctx, testEntry) + require.NoError(t, err) output1.AssertCalled(t, "Process", ctx, mock.Anything) output2.AssertCalled(t, "Process", ctx, mock.Anything) } @@ -75,7 +76,8 @@ func TestWriterOperatorOutputs(t *testing.T) { ctx := context.Background() testEntry := entry.New() - writer.Write(ctx, testEntry) + err := writer.Write(ctx, testEntry) + require.NoError(t, err) output1.AssertCalled(t, "Process", ctx, mock.Anything) output2.AssertCalled(t, "Process", ctx, mock.Anything) } diff --git a/pkg/stanza/operator/input/file/input.go b/pkg/stanza/operator/input/file/input.go index ce20ef18cc55..fd845499aeae 100644 --- a/pkg/stanza/operator/input/file/input.go +++ b/pkg/stanza/operator/input/file/input.go @@ -51,6 +51,5 @@ func (i *Input) emit(ctx context.Context, token []byte, attrs map[string]any) er i.Logger().Error("set attribute", zap.Error(err)) } } - i.Write(ctx, ent) - return nil + return i.Write(ctx, ent) } diff --git a/pkg/stanza/operator/input/generate/input.go b/pkg/stanza/operator/input/generate/input.go index a07be4ff16bd..31edb2414121 100644 --- a/pkg/stanza/operator/input/generate/input.go +++ b/pkg/stanza/operator/input/generate/input.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -44,7 +46,11 @@ func (i *Input) Start(_ operator.Persister) error { if !i.static { entry.Timestamp = time.Now() } - i.Write(ctx, entry) + err := i.Write(ctx, entry) + if err != nil { + i.Logger().Error("failed to write entry", zap.Error(err)) + return + } n++ if n == i.count { diff --git a/pkg/stanza/operator/input/journald/input.go b/pkg/stanza/operator/input/journald/input.go index f7af8c3ad6e5..2e4f80591451 100644 --- a/pkg/stanza/operator/input/journald/input.go +++ b/pkg/stanza/operator/input/journald/input.go @@ -150,10 +150,12 @@ func (operator *Input) Start(persister operator.Persister) error { operator.Logger().Warn("Failed to parse journal entry", zap.Error(err)) continue } - if err := operator.persister.Set(ctx, lastReadCursorKey, []byte(cursor)); err != nil { + if err = operator.persister.Set(ctx, lastReadCursorKey, []byte(cursor)); err != nil { operator.Logger().Warn("Failed to set offset", zap.Error(err)) } - operator.Write(ctx, entry) + if err = operator.Write(ctx, entry); err != nil { + operator.Logger().Error("failed to write entry", zap.Error(err)) + } } }() diff --git a/pkg/stanza/operator/input/namedpipe/input.go b/pkg/stanza/operator/input/namedpipe/input.go index c659dbc4924a..8dc89150d1f9 100644 --- a/pkg/stanza/operator/input/namedpipe/input.go +++ b/pkg/stanza/operator/input/namedpipe/input.go @@ -139,6 +139,5 @@ func (i *Input) sendEntry(ctx context.Context, bytes []byte) error { return fmt.Errorf("failed to create entry: %w", err) } - i.Write(ctx, entry) - return nil + return i.Write(ctx, entry) } diff --git a/pkg/stanza/operator/input/stdin/input.go b/pkg/stanza/operator/input/stdin/input.go index faf05b566776..298a46217459 100644 --- a/pkg/stanza/operator/input/stdin/input.go +++ b/pkg/stanza/operator/input/stdin/input.go @@ -53,7 +53,7 @@ func (i *Input) Start(_ operator.Persister) error { } if ok := scanner.Scan(); !ok { - if err := scanner.Err(); err != nil { + if err = scanner.Err(); err != nil { i.Logger().Error("Scanning failed", zap.Error(err)) } i.Logger().Info("Stdin has been closed") @@ -62,7 +62,11 @@ func (i *Input) Start(_ operator.Persister) error { e := entry.New() e.Body = scanner.Text() - i.Write(ctx, e) + err = i.Write(ctx, e) + if err != nil { + i.Logger().Error("failed to write entry", zap.Error(err)) + return + } } }() diff --git a/pkg/stanza/operator/input/tcp/input.go b/pkg/stanza/operator/input/tcp/input.go index 9f7feff86ac8..f5f0af8fd483 100644 --- a/pkg/stanza/operator/input/tcp/input.go +++ b/pkg/stanza/operator/input/tcp/input.go @@ -188,7 +188,10 @@ func (i *Input) handleMessage(ctx context.Context, conn net.Conn, dec *decode.De } } - i.Write(ctx, entry) + err = i.Write(ctx, entry) + if err != nil { + i.Logger().Error("Failed to write entry", zap.Error(err)) + } } func truncateMaxLog(data []byte, maxLogSize int) (token []byte) { diff --git a/pkg/stanza/operator/input/udp/input.go b/pkg/stanza/operator/input/udp/input.go index e064dd528ed5..f718e187f15b 100644 --- a/pkg/stanza/operator/input/udp/input.go +++ b/pkg/stanza/operator/input/udp/input.go @@ -220,7 +220,10 @@ func (i *Input) handleMessage(ctx context.Context, remoteAddr net.Addr, dec *dec } } - i.Write(ctx, entry) + err = i.Write(ctx, entry) + if err != nil { + i.Logger().Error("Failed to write entry", zap.Error(err)) + } } // readMessage will read log messages from the connection. diff --git a/pkg/stanza/operator/parser/container/parser.go b/pkg/stanza/operator/parser/container/parser.go index 2df3a636577b..af640c4a5d34 100644 --- a/pkg/stanza/operator/parser/container/parser.go +++ b/pkg/stanza/operator/parser/container/parser.go @@ -110,8 +110,7 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) { return p.HandleEntryError(ctx, entry, err) } if skip { - p.Write(ctx, entry) - return nil + return p.Write(ctx, entry) } if format == containerdFormat { @@ -160,7 +159,11 @@ func (p *Parser) crioConsumer(ctx context.Context) { defer p.criConsumers.Done() for entries := range entriesChan { for _, e := range entries { - p.Write(ctx, e) + err := p.Write(ctx, e) + if err != nil { + p.Logger().Error("failed to write entry", zap.Error(err)) + return + } } } } diff --git a/pkg/stanza/operator/transformer/filter/transformer.go b/pkg/stanza/operator/transformer/filter/transformer.go index d8fab48e7a03..b92a3225ba64 100644 --- a/pkg/stanza/operator/transformer/filter/transformer.go +++ b/pkg/stanza/operator/transformer/filter/transformer.go @@ -40,8 +40,7 @@ func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error { } if !filtered { - t.Write(ctx, entry) - return nil + return t.Write(ctx, entry) } i, err := randInt(rand.Reader, upperBound) @@ -50,7 +49,10 @@ func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error { } if i.Cmp(t.dropCutoff) >= 0 { - t.Write(ctx, entry) + err := t.Write(ctx, entry) + if err != nil { + return err + } } return nil diff --git a/pkg/stanza/operator/transformer/noop/transformer.go b/pkg/stanza/operator/transformer/noop/transformer.go index 1656a3557701..e62ff5ee6af8 100644 --- a/pkg/stanza/operator/transformer/noop/transformer.go +++ b/pkg/stanza/operator/transformer/noop/transformer.go @@ -17,6 +17,5 @@ type Transformer struct { // Process will forward the entry to the next output without any alterations. func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error { - t.Write(ctx, entry) - return nil + return t.Write(ctx, entry) } diff --git a/pkg/stanza/operator/transformer/recombine/transformer.go b/pkg/stanza/operator/transformer/recombine/transformer.go index a33f59d317f3..787ba03ee5cc 100644 --- a/pkg/stanza/operator/transformer/recombine/transformer.go +++ b/pkg/stanza/operator/transformer/recombine/transformer.go @@ -223,9 +223,9 @@ func (t *Transformer) flushSource(ctx context.Context, source string) error { return err } - t.Write(ctx, batch.baseEntry) + err = t.Write(ctx, batch.baseEntry) t.removeBatch(source) - return nil + return err } // addNewBatch creates a new batch for the given source and adds the entry to it. diff --git a/pkg/stanza/operator/transformer/router/transformer.go b/pkg/stanza/operator/transformer/router/transformer.go index 93fb3da427d8..099828bdea0b 100644 --- a/pkg/stanza/operator/transformer/router/transformer.go +++ b/pkg/stanza/operator/transformer/router/transformer.go @@ -48,13 +48,17 @@ func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error { // we compile the expression with "AsBool", so this should be safe if matches.(bool) { - if err := route.Attribute(entry); err != nil { + if err = route.Attribute(entry); err != nil { t.Logger().Error("Failed to label entry", zap.Error(err)) return err } for _, output := range route.OutputOperators { - _ = output.Process(ctx, entry) + err = output.Process(ctx, entry) + if err != nil { + t.Logger().Error("Failed to process entry", zap.Error(err)) + return err + } } break } diff --git a/processor/logstransformprocessor/processor_test.go b/processor/logstransformprocessor/processor_test.go index a41081061fd0..f1d599a22da0 100644 --- a/processor/logstransformprocessor/processor_test.go +++ b/processor/logstransformprocessor/processor_test.go @@ -213,8 +213,7 @@ func (t *laggyOperator) Process(ctx context.Context, e *entry.Entry) error { t.logsCount++ - t.Write(ctx, e) - return nil + return t.Write(ctx, e) } func (t *laggyOperator) CanProcess() bool {