diff --git a/.chloggen/feat_ecs-format.yaml b/.chloggen/feat_ecs-format.yaml new file mode 100644 index 000000000000..26c1fdd22ed1 --- /dev/null +++ b/.chloggen/feat_ecs-format.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Initial pass in implementing the `ecs` mapping mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31553] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Breaking change if mapping `mode` is set to `ecs`, use `none` to maintain existing format + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index a82137c6ccd5..51f038139cac 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -31,7 +31,7 @@ This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www [index](https://www.elastic.co/guide/en/elasticsearch/reference/current/indices.html) or [datastream](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html) name to publish events to. The default value is `logs-generic-default` -- `logs_dynamic_index` (optional): +- `logs_dynamic_index` (optional): takes resource or log record attribute named `elasticsearch.index.prefix` and `elasticsearch.index.suffix` resulting dynamically prefixed / suffixed indexing based on `logs_index`. (priority: resource attribute > log record attribute) - `enabled`(default=false): Enable/Disable dynamic index for log records @@ -62,11 +62,11 @@ This exporter supports sending OpenTelemetry logs to [Elasticsearch](https://www - `max_interval` (default=1m): Max waiting time if a HTTP request failed. - `mapping`: Events are encoded to JSON. The `mapping` allows users to configure additional mapping rules. - - `mode` (default=ecs): The fields naming mode. valid modes are: + - `mode` (default=none): The fields naming mode. valid modes are: - `none`: Use original fields and event structure from the OTLP event. - `ecs`: Try to map fields defined in the [OpenTelemetry Semantic Conventions](https://github.com/open-telemetry/semantic-conventions) - to [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html). + to [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html). :warning: This mode's behavior is unstable, it is currently undergoing changes - `raw`: Omit the `Attributes.` string prefixed to field names for log and span attributes as well as omit the `Events.` string prefixed to field names for span events. diff --git a/exporter/elasticsearchexporter/attribute.go b/exporter/elasticsearchexporter/attribute.go index 115cd1930f03..c2c187899b77 100644 --- a/exporter/elasticsearchexporter/attribute.go +++ b/exporter/elasticsearchexporter/attribute.go @@ -18,12 +18,21 @@ type attrGetter interface { Attributes() pcommon.Map } -// retrieve attribute out of resource and record (span or log, if not found in resource) -func getFromBothResourceAndAttribute(name string, resource attrGetter, record attrGetter) string { +// retrieve attribute out of resource, scope, and record (span or log, if not found in resource) +func getFromAttributes(name string, resource, scope, record attrGetter) string { var str string val, exist := resource.Attributes().Get(name) if !exist { - val, exist = record.Attributes().Get(name) + val, exist = scope.Attributes().Get(name) + if !exist { + val, exist = record.Attributes().Get(name) + if exist { + str = val.AsString() + } + } + if exist { + str = val.AsString() + } } if exist { str = val.AsString() diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 4a896d77e762..265d199638de 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -63,7 +63,7 @@ func TestLoad_DeprecatedIndexConfigOption(t *testing.T) { MaxInterval: 1 * time.Minute, }, Mapping: MappingsSettings{ - Mode: "ecs", + Mode: "none", Dedup: true, Dedot: true, }, @@ -138,7 +138,7 @@ func TestLoadConfig(t *testing.T) { MaxInterval: 1 * time.Minute, }, Mapping: MappingsSettings{ - Mode: "ecs", + Mode: "none", Dedup: true, Dedot: true, }, @@ -188,7 +188,7 @@ func TestLoadConfig(t *testing.T) { MaxInterval: 1 * time.Minute, }, Mapping: MappingsSettings{ - Mode: "ecs", + Mode: "none", Dedup: true, Dedot: true, }, diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 518eba3eccff..44bc0e0a29a6 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -53,7 +53,7 @@ func createDefaultConfig() component.Config { MaxInterval: 1 * time.Minute, }, Mapping: MappingsSettings{ - Mode: "ecs", + Mode: "none", Dedup: true, Dedot: true, }, diff --git a/exporter/elasticsearchexporter/logs_exporter.go b/exporter/elasticsearchexporter/logs_exporter.go index 2c94d816418d..dfc7fd5eee9a 100644 --- a/exporter/elasticsearchexporter/logs_exporter.go +++ b/exporter/elasticsearchexporter/logs_exporter.go @@ -90,8 +90,9 @@ func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Lo resource := rl.Resource() ills := rl.ScopeLogs() for j := 0; j < ills.Len(); j++ { - scope := ills.At(j).Scope() - logs := ills.At(j).LogRecords() + ill := ills.At(j) + scope := ill.Scope() + logs := ill.LogRecords() for k := 0; k < logs.Len(); k++ { if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil { if cerr := ctx.Err(); cerr != nil { @@ -110,8 +111,8 @@ func (e *elasticsearchLogsExporter) pushLogsData(ctx context.Context, ld plog.Lo func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromBothResourceAndAttribute(indexPrefix, resource, record) - suffix := getFromBothResourceAndAttribute(indexSuffix, resource, record) + prefix := getFromAttributes(indexPrefix, resource, scope, record) + suffix := getFromAttributes(indexSuffix, resource, scope, record) fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) } diff --git a/exporter/elasticsearchexporter/logs_exporter_test.go b/exporter/elasticsearchexporter/logs_exporter_test.go index d84e618e86e4..51870ee17295 100644 --- a/exporter/elasticsearchexporter/logs_exporter_test.go +++ b/exporter/elasticsearchexporter/logs_exporter_test.go @@ -122,7 +122,7 @@ func TestExporter_New(t *testing.T) { cfg.Mapping.Dedot = false cfg.Mapping.Dedup = true }), - want: successWithInternalModel(&encodeModel{dedot: false, dedup: true, mode: MappingECS}), + want: successWithInternalModel(&encodeModel{dedot: false, dedup: true, mode: MappingNone}), }, } @@ -153,6 +153,7 @@ func TestExporter_PushEvent(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("skipping test on Windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10178") } + t.Run("publish with success", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { @@ -167,6 +168,40 @@ func TestExporter_PushEvent(t *testing.T) { rec.WaitItems(2) }) + t.Run("publish with ecs encoding", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + expected := `{"@timestamp":"1970-01-01T00:00:00.000000000Z","application":"myapp","attrKey1":"abc","attrKey2":"def","error":{"stack_trace":"no no no no"},"message":"hello world","service":{"name":"myservice"}}` + actual := string(docs[0].Document) + assert.Equal(t, expected, actual) + + return itemsAllOK(docs) + }) + + testConfig := withTestExporterConfig(func(cfg *Config) { + cfg.Mapping.Mode = "ecs" + })(server.URL) + exporter := newTestExporter(t, server.URL, func(cfg *Config) { *cfg = *testConfig }) + mustSendLogsWithAttributes(t, exporter, + // record attrs + map[string]string{ + "application": "myapp", + "service.name": "myservice", + }, + // resource attrs + map[string]string{ + "attrKey1": "abc", + "attrKey2": "def", + "exception.stacktrace": "no no no no", + }, + // record body + "hello world", + ) + rec.WaitItems(1) + }) + t.Run("publish with dynamic index", func(t *testing.T) { rec := newBulkRecorder() @@ -206,6 +241,7 @@ func TestExporter_PushEvent(t *testing.T) { map[string]string{ indexPrefix: prefix, }, + "hello world", ) rec.WaitItems(1) @@ -237,7 +273,7 @@ func TestExporter_PushEvent(t *testing.T) { defaultCfg = *cfg }) - mustSendLogsWithAttributes(t, exporter, nil, nil) + mustSendLogsWithAttributes(t, exporter, nil, nil, "") rec.WaitItems(1) }) @@ -281,6 +317,7 @@ func TestExporter_PushEvent(t *testing.T) { map[string]string{ indexPrefix: prefix, }, + "", ) rec.WaitItems(1) }) @@ -476,14 +513,13 @@ func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents string } // send trace with span & resource attributes -func mustSendLogsWithAttributes(t *testing.T, exporter *elasticsearchLogsExporter, attrMp map[string]string, resMp map[string]string) { +func mustSendLogsWithAttributes(t *testing.T, exporter *elasticsearchLogsExporter, attrMp map[string]string, resMp map[string]string, body string) { logs := newLogsWithAttributeAndResourceMap(attrMp, resMp) - resLogs := logs.ResourceLogs().At(0) - logRecords := resLogs.ScopeLogs().At(0).LogRecords().At(0) - - scopeLogs := resLogs.ScopeLogs().AppendEmpty() - scope := scopeLogs.Scope() + resSpans := logs.ResourceLogs().At(0) + scopeLog := resSpans.ScopeLogs().At(0) + logRecords := scopeLog.LogRecords().At(0) + logRecords.Body().SetStr(body) - err := exporter.pushLogRecord(context.TODO(), resLogs.Resource(), logRecords, scope) + err := exporter.pushLogRecord(context.TODO(), resSpans.Resource(), logRecords, scopeLog.Scope()) require.NoError(t, err) } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index f303e1261e30..cccdfa386923 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -41,20 +41,72 @@ const ( func (m *encodeModel) encodeLog(resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) ([]byte, error) { var document objmodel.Document - docTimeStamp := record.Timestamp() - if docTimeStamp.AsTime().UnixNano() == 0 { - docTimeStamp = record.ObservedTimestamp() + + switch m.mode { + case MappingECS: + if record.Timestamp() != 0 { + document.AddTimestamp("@timestamp", record.Timestamp()) + } else { + document.AddTimestamp("@timestamp", record.ObservedTimestamp()) + } + + document.AddTraceID("trace.id", record.TraceID()) + document.AddSpanID("span.id", record.SpanID()) + + if n := record.SeverityNumber(); n != plog.SeverityNumberUnspecified { + document.AddInt("event.severity", int64(record.SeverityNumber())) + } + + document.AddString("log.level", record.SeverityText()) + + if record.Body().Type() == pcommon.ValueTypeStr { + document.AddAttribute("message", record.Body()) + } + + fieldMapper := func(k string) string { + switch k { + case "exception.type": + return "error.type" + case "exception.message": + return "error.message" + case "exception.stacktrace": + return "error.stack_trace" + default: + return k + } + } + + resource.Attributes().Range(func(k string, v pcommon.Value) bool { + k = fieldMapper(k) + document.AddAttribute(k, v) + return true + }) + scope.Attributes().Range(func(k string, v pcommon.Value) bool { + k = fieldMapper(k) + document.AddAttribute(k, v) + return true + }) + record.Attributes().Range(func(k string, v pcommon.Value) bool { + k = fieldMapper(k) + document.AddAttribute(k, v) + return true + }) + default: + docTimeStamp := record.Timestamp() + if docTimeStamp.AsTime().UnixNano() == 0 { + docTimeStamp = record.ObservedTimestamp() + } + document.AddTimestamp("@timestamp", docTimeStamp) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. + document.AddTraceID("TraceId", record.TraceID()) + document.AddSpanID("SpanId", record.SpanID()) + document.AddInt("TraceFlags", int64(record.Flags())) + document.AddString("SeverityText", record.SeverityText()) + document.AddInt("SeverityNumber", int64(record.SeverityNumber())) + document.AddAttribute("Body", record.Body()) + m.encodeAttributes(&document, record.Attributes()) + document.AddAttributes("Resource", resource.Attributes()) + document.AddAttributes("Scope", scopeToAttributes(scope)) } - document.AddTimestamp("@timestamp", docTimeStamp) // We use @timestamp in order to ensure that we can index if the default data stream logs template is used. - document.AddTraceID("TraceId", record.TraceID()) - document.AddSpanID("SpanId", record.SpanID()) - document.AddInt("TraceFlags", int64(record.Flags())) - document.AddString("SeverityText", record.SeverityText()) - document.AddInt("SeverityNumber", int64(record.SeverityNumber())) - document.AddAttribute("Body", record.Body()) - m.encodeAttributes(&document, record.Attributes()) - document.AddAttributes("Resource", resource.Attributes()) - document.AddAttributes("Scope", scopeToAttributes(scope)) if m.dedup { document.Dedup() diff --git a/exporter/elasticsearchexporter/trace_exporter.go b/exporter/elasticsearchexporter/trace_exporter.go index 0d5f0e28bc75..4f313f0d5546 100644 --- a/exporter/elasticsearchexporter/trace_exporter.go +++ b/exporter/elasticsearchexporter/trace_exporter.go @@ -83,10 +83,12 @@ func (e *elasticsearchTracesExporter) pushTraceData( resource := il.Resource() scopeSpans := il.ScopeSpans() for j := 0; j < scopeSpans.Len(); j++ { - scope := scopeSpans.At(j).Scope() - spans := scopeSpans.At(j).Spans() + scopeSpan := scopeSpans.At(j) + scope := scopeSpan.Scope() + spans := scopeSpan.Spans() for k := 0; k < spans.Len(); k++ { - if err := e.pushTraceRecord(ctx, resource, spans.At(k), scope); err != nil { + span := spans.At(k) + if err := e.pushTraceRecord(ctx, resource, span, scope); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -102,8 +104,8 @@ func (e *elasticsearchTracesExporter) pushTraceData( func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error { fIndex := e.index if e.dynamicIndex { - prefix := getFromBothResourceAndAttribute(indexPrefix, resource, span) - suffix := getFromBothResourceAndAttribute(indexSuffix, resource, span) + prefix := getFromAttributes(indexPrefix, resource, scope, span) + suffix := getFromAttributes(indexSuffix, resource, scope, span) fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix) } diff --git a/exporter/elasticsearchexporter/traces_exporter_test.go b/exporter/elasticsearchexporter/traces_exporter_test.go index 34cc78b912cb..b34354bdb0fb 100644 --- a/exporter/elasticsearchexporter/traces_exporter_test.go +++ b/exporter/elasticsearchexporter/traces_exporter_test.go @@ -99,7 +99,7 @@ func TestTracesExporter_New(t *testing.T) { cfg.Mapping.Dedot = false cfg.Mapping.Dedup = true }), - want: successWithInternalModel(&encodeModel{dedot: false, dedup: true, mode: MappingECS}), + want: successWithInternalModel(&encodeModel{dedot: false, dedup: true, mode: MappingNone}), }, }