diff --git a/.chloggen/header-extraction-kafka.yaml b/.chloggen/header-extraction-kafka.yaml new file mode 100644 index 000000000000..196a5f362a06 --- /dev/null +++ b/.chloggen/header-extraction-kafka.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Allow users to attach kafka header metadata with the log/metric/trace record in the pipeline. Introduce a new config param, 'header_extraction' and some examples. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24367] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 3f6fba151740..1f366ac729cd 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -87,7 +87,10 @@ The following settings can be optionally configured: - `after`: (default = false) If true, the messages are marked after the pipeline execution - `on_error`: (default = false) If false, only the successfully processed messages are marked **Note: this can block the entire partition in case a message processing returns a permanent error** - +- `header_extraction`: + - `extract_headers` (default = false): Allows user to attach header fields to resource attributes in otel piepline + - `headers` (default = []): List of headers they'd like to extract from kafka record. + **Note: Matching pattern will be `exact`. Regexes are not supported as of now.** Example: ```yaml @@ -95,3 +98,40 @@ receivers: kafka: protocol_version: 2.0.0 ``` + +Example of header extraction: + +```yaml +receivers: + kafka: + topic: test + header_extraction: + extract_headers: true + headers: ["header1", "header2"] +``` + +- If we feed following kafka record to `test` topic and use above configs: +```yaml +{ + event: Hello, + headers: { + header1: value1, + header2: value2, + } +} +``` +we will get a log record in collector similar to: +```yaml +{ + ... + body: Hello, + resource: { + kafka.header.header1: value1, + kafka.header.header2: value2, + }, + ... +} +``` + +- Here you can see the kafka record header `header1` and `header2` being added to resource attribute. +- Every **matching** kafka header key is prefixed with `kafka.header` string and attached to resource attributes. \ No newline at end of file diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 1ed844b2ad89..7487e06b0be6 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -31,6 +31,11 @@ type MessageMarking struct { OnError bool `mapstructure:"on_error"` } +type HeaderExtraction struct { + ExtractHeaders bool `mapstructure:"extract_headers"` + Headers []string `mapstructure:"headers"` +} + // Config defines configuration for Kafka receiver. type Config struct { // The list of kafka brokers (default localhost:9092) @@ -60,6 +65,9 @@ type Config struct { // Controls the way the messages are marked as consumed MessageMarking MessageMarking `mapstructure:"message_marking"` + + // Extract headers from kafka records + HeaderExtraction HeaderExtraction `mapstructure:"header_extraction"` } const ( diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index 00ff43e4fa98..cbcadf3233a8 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -111,6 +111,9 @@ func createDefaultConfig() component.Config { After: false, OnError: false, }, + HeaderExtraction: HeaderExtraction{ + ExtractHeaders: false, + }, } } diff --git a/receiver/kafkareceiver/header_extraction.go b/receiver/kafkareceiver/header_extraction.go new file mode 100644 index 000000000000..265c84fb33db --- /dev/null +++ b/receiver/kafkareceiver/header_extraction.go @@ -0,0 +1,94 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" + +import ( + "fmt" + + "github.com/IBM/sarama" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" +) + +func getAttribute(key string) string { + return fmt.Sprintf("kafka.header.%s", key) +} + +type HeaderExtractor interface { + extractHeadersTraces(ptrace.Traces, *sarama.ConsumerMessage) + extractHeadersMetrics(pmetric.Metrics, *sarama.ConsumerMessage) + extractHeadersLogs(plog.Logs, *sarama.ConsumerMessage) +} + +type headerExtractor struct { + logger *zap.Logger + headers []string +} + +func (he *headerExtractor) extractHeadersTraces(traces ptrace.Traces, message *sarama.ConsumerMessage) { + for _, header := range he.headers { + value, ok := getHeaderValue(message.Headers, header) + if !ok { + he.logger.Debug("Header key not found in the trace: ", zap.String("key", header)) + continue + } + for i := 0; i < traces.ResourceSpans().Len(); i++ { + rs := traces.ResourceSpans().At(i) + rs.Resource().Attributes().PutStr(getAttribute(header), value) + } + } +} + +func (he *headerExtractor) extractHeadersLogs(logs plog.Logs, message *sarama.ConsumerMessage) { + for _, header := range he.headers { + value, ok := getHeaderValue(message.Headers, header) + if !ok { + he.logger.Debug("Header key not found in the log: ", zap.String("key", header)) + continue + } + for i := 0; i < logs.ResourceLogs().Len(); i++ { + rl := logs.ResourceLogs().At(i) + rl.Resource().Attributes().PutStr(getAttribute(header), value) + } + } +} + +func (he *headerExtractor) extractHeadersMetrics(metrics pmetric.Metrics, message *sarama.ConsumerMessage) { + for _, header := range he.headers { + value, ok := getHeaderValue(message.Headers, header) + if !ok { + he.logger.Debug("Header key not found in the metric: ", zap.String("key", header)) + continue + } + for i := 0; i < metrics.ResourceMetrics().Len(); i++ { + rm := metrics.ResourceMetrics().At(i) + rm.Resource().Attributes().PutStr(getAttribute(header), value) + } + } +} + +func getHeaderValue(headers []*sarama.RecordHeader, header string) (string, bool) { + for _, kafkaHeader := range headers { + headerKey := string(kafkaHeader.Key) + if headerKey == header { + // matching header found + return string(kafkaHeader.Value), true + } + } + // no header found matching the key, report to the user + return "", false +} + +type nopHeaderExtractor struct{} + +func (he *nopHeaderExtractor) extractHeadersTraces(_ ptrace.Traces, _ *sarama.ConsumerMessage) { +} + +func (he *nopHeaderExtractor) extractHeadersLogs(_ plog.Logs, _ *sarama.ConsumerMessage) { +} + +func (he *nopHeaderExtractor) extractHeadersMetrics(_ pmetric.Metrics, _ *sarama.ConsumerMessage) { +} diff --git a/receiver/kafkareceiver/header_extraction_test.go b/receiver/kafkareceiver/header_extraction_test.go new file mode 100644 index 000000000000..c5219c219b55 --- /dev/null +++ b/receiver/kafkareceiver/header_extraction_test.go @@ -0,0 +1,210 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkareceiver + +import ( + "context" + "sync" + "testing" + + "github.com/IBM/sarama" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" +) + +func TestHeaderExtractionTraces(t *testing.T) { + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverCreateSettings: receivertest.NewNopCreateSettings(), + }) + require.NoError(t, err) + nextConsumer := &consumertest.TracesSink{} + c := tracesConsumerGroupHandler{ + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zaptest.NewLogger(t), + ready: make(chan bool), + nextConsumer: nextConsumer, + obsrecv: obsrecv, + } + headers := []string{"headerKey1", "headerKey2"} + c.headerExtractor = &headerExtractor{ + logger: zaptest.NewLogger(t), + headers: headers, + } + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + ctx, cancelFunc := context.WithCancel(context.Background()) + defer close(groupClaim.messageChan) + testSession := testConsumerGroupSession{ctx: ctx} + require.NoError(t, c.Setup(testSession)) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err = c.ConsumeClaim(testSession, groupClaim) + for _, trace := range nextConsumer.AllTraces() { + for i := 0; i < trace.ResourceSpans().Len(); i++ { + rs := trace.ResourceSpans().At(i) + validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValue1") + validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValue2") + } + } + assert.NoError(t, err) + wg.Done() + }() + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty().Resource() + td.ResourceSpans().At(0).ScopeSpans().AppendEmpty().Spans().AppendEmpty() + unmarshaler := &ptrace.ProtoMarshaler{} + bts, err := unmarshaler.MarshalTraces(td) + groupClaim.messageChan <- &sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + { + Key: []byte("headerKey1"), + Value: []byte("headerValue1"), + }, + { + Key: []byte("headerKey2"), + Value: []byte("headerValue2"), + }, + }, + Value: bts, + } + cancelFunc() + wg.Wait() + +} + +func TestHeaderExtractionLogs(t *testing.T) { + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverCreateSettings: receivertest.NewNopCreateSettings(), + }) + require.NoError(t, err) + nextConsumer := &consumertest.LogsSink{} + unmarshaler := newTextLogsUnmarshaler() + unmarshaler, err = unmarshaler.WithEnc("utf-8") + c := logsConsumerGroupHandler{ + unmarshaler: unmarshaler, + logger: zaptest.NewLogger(t), + ready: make(chan bool), + nextConsumer: nextConsumer, + obsrecv: obsrecv, + } + headers := []string{"headerKey1", "headerKey2"} + c.headerExtractor = &headerExtractor{ + logger: zaptest.NewLogger(t), + headers: headers, + } + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + ctx, cancelFunc := context.WithCancel(context.Background()) + defer close(groupClaim.messageChan) + testSession := testConsumerGroupSession{ctx: ctx} + require.NoError(t, c.Setup(testSession)) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err = c.ConsumeClaim(testSession, groupClaim) + for _, logs := range nextConsumer.AllLogs() { + for i := 0; i < logs.ResourceLogs().Len(); i++ { + rs := logs.ResourceLogs().At(i) + validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValueLog1") + validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValueLog2") + } + } + assert.NoError(t, err) + wg.Done() + }() + groupClaim.messageChan <- &sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + { + Key: []byte("headerKey1"), + Value: []byte("headerValueLog1"), + }, + { + Key: []byte("headerKey2"), + Value: []byte("headerValueLog2"), + }, + }, + Value: []byte("Message"), + } + cancelFunc() + wg.Wait() + +} + +func TestHeaderExtractionMetrics(t *testing.T) { + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverCreateSettings: receivertest.NewNopCreateSettings(), + }) + require.NoError(t, err) + nextConsumer := &consumertest.MetricsSink{} + c := metricsConsumerGroupHandler{ + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zaptest.NewLogger(t), + ready: make(chan bool), + nextConsumer: nextConsumer, + obsrecv: obsrecv, + } + headers := []string{"headerKey1", "headerKey2"} + c.headerExtractor = &headerExtractor{ + logger: zaptest.NewLogger(t), + headers: headers, + } + groupClaim := &testConsumerGroupClaim{ + messageChan: make(chan *sarama.ConsumerMessage), + } + ctx, cancelFunc := context.WithCancel(context.Background()) + defer close(groupClaim.messageChan) + testSession := testConsumerGroupSession{ctx: ctx} + require.NoError(t, c.Setup(testSession)) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err = c.ConsumeClaim(testSession, groupClaim) + for _, metric := range nextConsumer.AllMetrics() { + for i := 0; i < metric.ResourceMetrics().Len(); i++ { + rs := metric.ResourceMetrics().At(i) + validateHeader(t, rs.Resource(), "kafka.header.headerKey1", "headerValueMetric1") + validateHeader(t, rs.Resource(), "kafka.header.headerKey2", "headerValueMetric2") + } + } + assert.NoError(t, err) + wg.Done() + }() + ld := testdata.GenerateMetricsOneMetric() + unmarshaler := &pmetric.ProtoMarshaler{} + bts, err := unmarshaler.MarshalMetrics(ld) + groupClaim.messageChan <- &sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + { + Key: []byte("headerKey1"), + Value: []byte("headerValueMetric1"), + }, + { + Key: []byte("headerKey2"), + Value: []byte("headerValueMetric2"), + }, + }, + Value: bts, + } + cancelFunc() + wg.Wait() + +} + +func validateHeader(t *testing.T, rs pcommon.Resource, headerKey string, headerValue string) { + val, ok := rs.Attributes().Get(headerKey) + assert.Equal(t, ok, true) + assert.Equal(t, val.Str(), headerValue) +} diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 2afcbb89ee9b..c8512828724f 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -40,6 +40,8 @@ type kafkaTracesConsumer struct { autocommitEnabled bool messageMarking MessageMarking + headerExtraction bool + headers []string } // kafkaMetricsConsumer uses sarama to consume and handle messages from kafka. @@ -54,6 +56,8 @@ type kafkaMetricsConsumer struct { autocommitEnabled bool messageMarking MessageMarking + headerExtraction bool + headers []string } // kafkaLogsConsumer uses sarama to consume and handle messages from kafka. @@ -68,6 +72,8 @@ type kafkaLogsConsumer struct { autocommitEnabled bool messageMarking MessageMarking + headerExtraction bool + headers []string } var _ receiver.Traces = (*kafkaTracesConsumer)(nil) @@ -114,6 +120,8 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, + headerExtraction: config.HeaderExtraction.ExtractHeaders, + headers: config.HeaderExtraction.Headers, }, nil } @@ -136,6 +144,13 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro obsrecv: obsrecv, autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, + headerExtractor: &nopHeaderExtractor{}, + } + if c.headerExtraction { + consumerGroup.headerExtractor = &headerExtractor{ + logger: c.settings.Logger, + headers: c.headers, + } } go func() { if err := c.consumeLoop(ctx, consumerGroup); err != nil { @@ -207,6 +222,8 @@ func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, + headerExtraction: config.HeaderExtraction.ExtractHeaders, + headers: config.HeaderExtraction.Headers, }, nil } @@ -229,6 +246,13 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err obsrecv: obsrecv, autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, + headerExtractor: &nopHeaderExtractor{}, + } + if c.headerExtraction { + metricsConsumerGroup.headerExtractor = &headerExtractor{ + logger: c.settings.Logger, + headers: c.headers, + } } go func() { if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil { @@ -300,6 +324,8 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, + headerExtraction: config.HeaderExtraction.ExtractHeaders, + headers: config.HeaderExtraction.Headers, }, nil } @@ -350,6 +376,13 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error obsrecv: obsrecv, autocommitEnabled: c.autocommitEnabled, messageMarking: c.messageMarking, + headerExtractor: &nopHeaderExtractor{}, + } + if c.headerExtraction { + logsConsumerGroup.headerExtractor = &headerExtractor{ + logger: c.settings.Logger, + headers: c.headers, + } } go func() { if err := c.consumeLoop(ctx, logsConsumerGroup); err != nil { @@ -394,6 +427,7 @@ type tracesConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking + headerExtractor HeaderExtractor } type metricsConsumerGroupHandler struct { @@ -409,6 +443,7 @@ type metricsConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking + headerExtractor HeaderExtractor } type logsConsumerGroupHandler struct { @@ -424,6 +459,7 @@ type logsConsumerGroupHandler struct { autocommitEnabled bool messageMarking MessageMarking + headerExtractor HeaderExtractor } var _ sarama.ConsumerGroupHandler = (*tracesConsumerGroupHandler)(nil) @@ -480,6 +516,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe return err } + c.headerExtractor.extractHeadersTraces(traces, message) spanCount := traces.SpanCount() err = c.nextConsumer.ConsumeTraces(session.Context(), traces) c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err) @@ -554,6 +591,7 @@ func (c *metricsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupS } return err } + c.headerExtractor.extractHeadersMetrics(metrics, message) dataPointCount := metrics.DataPointCount() err = c.nextConsumer.ConsumeMetrics(session.Context(), metrics) @@ -634,7 +672,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess } return err } - + c.headerExtractor.extractHeadersLogs(logs, message) err = c.nextConsumer.ConsumeLogs(session.Context(), logs) // TODO c.obsrecv.EndLogsOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err) diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 62d1a904be37..31d4edefbc30 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -139,11 +139,12 @@ func TestTracesConsumerGroupHandler(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } testSession := testConsumerGroupSession{ctx: context.Background()} @@ -188,11 +189,12 @@ func TestTracesConsumerGroupHandler_session_done(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -234,11 +236,12 @@ func TestTracesConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -261,11 +264,12 @@ func TestTracesConsumerGroupHandler_error_nextConsumer(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := tracesConsumerGroupHandler{ - unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, + unmarshaler: newPdataTracesUnmarshaler(&ptrace.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(consumerError), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -396,11 +400,12 @@ func TestMetricsConsumerGroupHandler(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } testSession := testConsumerGroupSession{ctx: context.Background()} @@ -445,11 +450,12 @@ func TestMetricsConsumerGroupHandler_session_done(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -490,11 +496,12 @@ func TestMetricsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -517,11 +524,12 @@ func TestMetricsConsumerGroupHandler_error_nextConsumer(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := metricsConsumerGroupHandler{ - unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, + unmarshaler: newPdataMetricsUnmarshaler(&pmetric.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(consumerError), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -651,11 +659,12 @@ func TestLogsConsumerGroupHandler(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } testSession := testConsumerGroupSession{ctx: context.Background()} @@ -700,11 +709,12 @@ func TestLogsConsumerGroupHandler_session_done(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } ctx, cancelFunc := context.WithCancel(context.Background()) @@ -745,11 +755,12 @@ func TestLogsConsumerGroupHandler_error_unmarshal(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewNop(), - obsrecv: obsrecv, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewNop(), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -772,11 +783,12 @@ func TestLogsConsumerGroupHandler_error_nextConsumer(t *testing.T) { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverCreateSettings: receivertest.NewNopCreateSettings()}) require.NoError(t, err) c := logsConsumerGroupHandler{ - unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: consumertest.NewErr(consumerError), - obsrecv: obsrecv, + unmarshaler: newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding), + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: consumertest.NewErr(consumerError), + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{} @@ -842,11 +854,12 @@ func TestLogsConsumerGroupHandler_unmarshal_text(t *testing.T) { require.NoError(t, err) sink := &consumertest.LogsSink{} c := logsConsumerGroupHandler{ - unmarshaler: unmarshaler, - logger: zap.NewNop(), - ready: make(chan bool), - nextConsumer: sink, - obsrecv: obsrecv, + unmarshaler: unmarshaler, + logger: zap.NewNop(), + ready: make(chan bool), + nextConsumer: sink, + obsrecv: obsrecv, + headerExtractor: &nopHeaderExtractor{}, } wg := sync.WaitGroup{}