From cb15832db41b45f02b4f11f4920496b75f50c572 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 4 Oct 2024 10:25:55 -0300 Subject: [PATCH 01/17] add mapping mode bodymap --- exporter/elasticsearchexporter/README.md | 5 ++- exporter/elasticsearchexporter/config.go | 4 +++ .../elasticsearchexporter/exporter_test.go | 33 +++++++++++++++++++ exporter/elasticsearchexporter/model.go | 10 ++++++ exporter/elasticsearchexporter/model_test.go | 30 +++++++++++++++++ 5 files changed, 81 insertions(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 31d2cae89c6f..42bafe755daf 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -162,7 +162,10 @@ behaviours, which may be configured through the following settings: - `raw`: Omit the `Attributes.` string prefixed to field names for log and span attributes as well as omit the `Events.` string prefixed to - field names for span events. + field names for span events. + - `bodymap`: Provides fine-grained control over the final documents to be ingested. + It works only for logs where the log record body is a map. Each LogRecord + body is serialized to JSON as-is and becomes a separate document for ingestion. - `dedup` (DEPRECATED). This configuration is deprecated and non-operational, and will be removed in the future. Object keys are always deduplicated to avoid Elasticsearch rejecting documents. diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index c2a0f755d9fa..f2567b51fc9d 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -207,6 +207,7 @@ const ( MappingECS MappingOTel MappingRaw + MappingBodyMap ) var ( @@ -224,6 +225,8 @@ func (m MappingMode) String() string { return "otel" case MappingRaw: return "raw" + case MappingBodyMap: + return "bodymap" default: return "" } @@ -236,6 +239,7 @@ var mappingModes = func() map[string]MappingMode { MappingECS, MappingOTel, MappingRaw, + MappingBodyMap, } { table[strings.ToLower(m.String())] = m } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 77ff8e2d0b47..d36fac52c417 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -82,6 +82,39 @@ func TestExporterLogs(t *testing.T) { rec.WaitItems(1) }) + t.Run("publish with bodymap encoding", func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + assert.JSONEq(t, + `{"@timestamp":"2024-03-12T20:00:41.123456789Z","id":1,"key":"value"}`, + string(docs[0].Document), + ) + rec.Record(docs) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "bodymap" + }) + + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + logRecords := scopeLogs.LogRecords() + observedTimestamp := pcommon.Timestamp(time.Now().UnixNano()) + logRecord := logRecords.AppendEmpty() + + logRecord.SetObservedTimestamp(observedTimestamp) + bodyMap := pcommon.NewMap() + bodyMap.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z") + bodyMap.PutInt("id", 1) + bodyMap.PutStr("key", "value") + bodyMap.CopyTo(logRecord.Body().SetEmptyMap()) + + mustSendLogs(t, exporter, logs) + rec.WaitItems(1) + }) + t.Run("publish with dedot", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index ce4b9a3a22da..43fb57bc7491 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -107,6 +107,8 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str document = m.encodeLogECSMode(resource, record, scope) case MappingOTel: document = m.encodeLogOTelMode(resource, resourceSchemaURL, record, scope, scopeSchemaURL) + case MappingBodyMap: + document = m.encodeLogBodyMapMode(record) default: document = m.encodeLogDefaultMode(resource, record, scope) } @@ -138,6 +140,14 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo return document } +func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) objmodel.Document { + body := record.Body() + if body.Type() != pcommon.ValueTypeMap { + return objmodel.Document{} + } + return objmodel.DocumentFromAttributes(body.Map()) +} + func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document { var document objmodel.Document diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index ec17db600f78..839d97f24762 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -1208,3 +1208,33 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) { fooValue = gjson.GetBytes(encoded, "Attributes\\.foo\\.value") assert.Equal(t, "foovalue", fooValue.Str) } + +func TestEncodeLogBodyMapMode(t *testing.T) { + // craft a log record with a body map + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + logRecords := scopeLogs.LogRecords() + observedTimestamp := pcommon.Timestamp(time.Now().UnixNano()) + + logRecord := logRecords.AppendEmpty() + logRecord.SetObservedTimestamp(observedTimestamp) + + bodyMap := pcommon.NewMap() + bodyMap.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z") + bodyMap.PutInt("id", 1) + bodyMap.PutStr("key", "value") + bodyMap.CopyTo(logRecord.Body().SetEmptyMap()) + + var buf bytes.Buffer + m := encodeModel{} + doc := m.encodeLogBodyMapMode(logRecord) + require.NoError(t, doc.Serialize(&buf, false, false)) + + got := buf.String() + require.JSONEq(t, `{ + "@timestamp": "2024-03-12T20:00:41.123456789Z", + "id": 1, + "key": "value" + }`, got) +} From 24f4f0d7cae225f856792d669d36bcc77405ea55 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 8 Oct 2024 08:02:44 -0300 Subject: [PATCH 02/17] add changelog entry --- ...feature_elasticsearch_mapping_bodymap.yaml | 27 +++++++++++++++++++ exporter/elasticsearchexporter/README.md | 1 + 2 files changed, 28 insertions(+) create mode 100644 .chloggen/feature_elasticsearch_mapping_bodymap.yaml diff --git a/.chloggen/feature_elasticsearch_mapping_bodymap.yaml b/.chloggen/feature_elasticsearch_mapping_bodymap.yaml new file mode 100644 index 000000000000..edca66c37c08 --- /dev/null +++ b/.chloggen/feature_elasticsearch_mapping_bodymap.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce an experimental mapbody mapping mode for logs + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35444] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 42bafe755daf..30c054ac8e5a 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -164,6 +164,7 @@ behaviours, which may be configured through the following settings: span attributes as well as omit the `Events.` string prefixed to field names for span events. - `bodymap`: Provides fine-grained control over the final documents to be ingested. + :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes. It works only for logs where the log record body is a map. Each LogRecord body is serialized to JSON as-is and becomes a separate document for ingestion. - `dedup` (DEPRECATED). This configuration is deprecated and non-operational, From 7078c705efc2904cf0393c73be180828881914ca Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 8 Oct 2024 09:18:12 -0300 Subject: [PATCH 03/17] Apply suggestions from code review Co-authored-by: Carson Ip --- .chloggen/feature_elasticsearch_mapping_bodymap.yaml | 2 +- exporter/elasticsearchexporter/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.chloggen/feature_elasticsearch_mapping_bodymap.yaml b/.chloggen/feature_elasticsearch_mapping_bodymap.yaml index edca66c37c08..9d749c7b3c45 100644 --- a/.chloggen/feature_elasticsearch_mapping_bodymap.yaml +++ b/.chloggen/feature_elasticsearch_mapping_bodymap.yaml @@ -7,7 +7,7 @@ change_type: enhancement component: elasticsearchexporter # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Introduce an experimental mapbody mapping mode for logs +note: Introduce an experimental bodymap mapping mode for logs # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [35444] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 30c054ac8e5a..d0de8aec4b18 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -165,7 +165,7 @@ behaviours, which may be configured through the following settings: field names for span events. - `bodymap`: Provides fine-grained control over the final documents to be ingested. :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes. - It works only for logs where the log record body is a map. Each LogRecord + It works only for logs where the log record body is a map. Each LogRecord body is serialized to JSON as-is and becomes a separate document for ingestion. - `dedup` (DEPRECATED). This configuration is deprecated and non-operational, and will be removed in the future. Object keys are always deduplicated to From 98b66d4b64469eb23ec5b86030fd42f655a881e7 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 8 Oct 2024 10:06:17 -0300 Subject: [PATCH 04/17] add more testcases --- .../elasticsearchexporter/exporter_test.go | 91 +++++++++++++------ 1 file changed, 62 insertions(+), 29 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index d36fac52c417..0efdcb772175 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -83,36 +83,69 @@ func TestExporterLogs(t *testing.T) { }) t.Run("publish with bodymap encoding", func(t *testing.T) { - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - assert.JSONEq(t, - `{"@timestamp":"2024-03-12T20:00:41.123456789Z","id":1,"key":"value"}`, - string(docs[0].Document), - ) - rec.Record(docs) - return itemsAllOK(docs) - }) - - exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.Mapping.Mode = "bodymap" - }) - - logs := plog.NewLogs() - resourceLogs := logs.ResourceLogs().AppendEmpty() - scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() - logRecords := scopeLogs.LogRecords() - observedTimestamp := pcommon.Timestamp(time.Now().UnixNano()) - logRecord := logRecords.AppendEmpty() - - logRecord.SetObservedTimestamp(observedTimestamp) - bodyMap := pcommon.NewMap() - bodyMap.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z") - bodyMap.PutInt("id", 1) - bodyMap.PutStr("key", "value") - bodyMap.CopyTo(logRecord.Body().SetEmptyMap()) + tableTests := []struct { + name string + body func() pcommon.Map + expected string + }{ + { + name: "flat", + body: func() pcommon.Map { + body := pcommon.NewMap() + body.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z") + body.PutInt("id", 1) + body.PutStr("key", "value") + return body + }, + expected: `{"@timestamp":"2024-03-12T20:00:41.123456789Z","id":1,"key":"value"}`, + }, + { + name: "dot", + body: func() pcommon.Map { + body := pcommon.NewMap() + body.PutInt("a", 1) + body.PutInt("a.b", 2) + body.PutInt("a.b.c", 3) + return body + }, + expected: `{"a":{"value":1,"b":{"value":2,"c":3}}}`, + }, + { + name: "nested map", + body: func() pcommon.Map { + body := pcommon.NewMap() + body.PutInt("a", 1) + b := body.PutEmptyMap("b") + b.PutInt("c", 2) + return body + }, + expected: `{"a":1,"b":{"c":2}}`, + }, + } - mustSendLogs(t, exporter, logs) - rec.WaitItems(1) + for _, tt := range tableTests { + t.Run(tt.name, func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + assert.JSONEq(t, tt.expected, string(docs[0].Document)) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "bodymap" + }) + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + logRecords := scopeLogs.LogRecords() + logRecord := logRecords.AppendEmpty() + tt.body().CopyTo(logRecord.Body().SetEmptyMap()) + + mustSendLogs(t, exporter, logs) + rec.WaitItems(1) + }) + } }) t.Run("publish with dedot", func(t *testing.T) { From 36e3443a727531e4ffb75fc5ddd5f006b69ea43f Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 10 Oct 2024 15:48:37 -0300 Subject: [PATCH 05/17] use json.Marshal instead of objmodel --- .../elasticsearchexporter/exporter_test.go | 36 +++++++++++++++---- exporter/elasticsearchexporter/model.go | 9 ++--- exporter/elasticsearchexporter/model_test.go | 16 +++++---- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 0efdcb772175..ecad417cfe69 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -100,7 +100,7 @@ func TestExporterLogs(t *testing.T) { expected: `{"@timestamp":"2024-03-12T20:00:41.123456789Z","id":1,"key":"value"}`, }, { - name: "dot", + name: "dotted key", body: func() pcommon.Map { body := pcommon.NewMap() body.PutInt("a", 1) @@ -108,18 +108,42 @@ func TestExporterLogs(t *testing.T) { body.PutInt("a.b.c", 3) return body }, - expected: `{"a":{"value":1,"b":{"value":2,"c":3}}}`, + expected: `{"a":1,"a.b":2,"a.b.c":3}`, + }, + { + name: "slice", + body: func() pcommon.Map { + body := pcommon.NewMap() + s := body.PutEmptySlice("a") + for i := 0; i < 2; i++ { + s.AppendEmpty().SetInt(int64(i)) + } + return body + }, + expected: `{"a":[0,1]}`, + }, + { + name: "inner map", + body: func() pcommon.Map { + body := pcommon.NewMap() + m := body.PutEmptyMap("a") + m.PutInt("b", 1) + m.PutInt("c", 2) + return body + }, + expected: `{"a":{"b":1,"c":2}}`, }, { name: "nested map", body: func() pcommon.Map { body := pcommon.NewMap() - body.PutInt("a", 1) - b := body.PutEmptyMap("b") - b.PutInt("c", 2) + m1 := body.PutEmptyMap("a") + m2 := m1.PutEmptyMap("b") + m2.PutInt("c", 1) + m2.PutInt("d", 2) return body }, - expected: `{"a":1,"b":{"c":2}}`, + expected: `{"a":{"b":{"c":1,"d":2}}}`, }, } diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 43fb57bc7491..e2680312885b 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -108,7 +108,7 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str case MappingOTel: document = m.encodeLogOTelMode(resource, resourceSchemaURL, record, scope, scopeSchemaURL) case MappingBodyMap: - document = m.encodeLogBodyMapMode(record) + return m.encodeLogBodyMapMode(record) default: document = m.encodeLogDefaultMode(resource, record, scope) } @@ -140,12 +140,13 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo return document } -func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) objmodel.Document { +func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error) { body := record.Body() if body.Type() != pcommon.ValueTypeMap { - return objmodel.Document{} + return []byte{}, nil } - return objmodel.DocumentFromAttributes(body.Map()) + + return json.Marshal(body.Map().AsRaw()) } func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 839d97f24762..384b14be987d 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -1224,17 +1224,21 @@ func TestEncodeLogBodyMapMode(t *testing.T) { bodyMap.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z") bodyMap.PutInt("id", 1) bodyMap.PutStr("key", "value") + bodyMap.PutStr("key.a", "a") + bodyMap.PutStr("key.a.b", "b") + bodyMap.PutDouble("pi", 3.14) bodyMap.CopyTo(logRecord.Body().SetEmptyMap()) - var buf bytes.Buffer m := encodeModel{} - doc := m.encodeLogBodyMapMode(logRecord) - require.NoError(t, doc.Serialize(&buf, false, false)) + got, err := m.encodeLogBodyMapMode(logRecord) + require.NoError(t, err) - got := buf.String() require.JSONEq(t, `{ "@timestamp": "2024-03-12T20:00:41.123456789Z", "id": 1, - "key": "value" - }`, got) + "key": "value", + "key.a": "a", + "key.a.b": "b", + "pi": 3.14 + }`, string(got)) } From ddc48fa360a92accc1bed1f35b4a713909b9c257 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 10 Oct 2024 15:51:35 -0300 Subject: [PATCH 06/17] return error if record body is not a map --- exporter/elasticsearchexporter/model.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index e2680312885b..21183ecc49ce 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -143,7 +143,7 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error) { body := record.Body() if body.Type() != pcommon.ValueTypeMap { - return []byte{}, nil + return []byte{}, errors.New("record body is not a map") } return json.Marshal(body.Map().AsRaw()) From 36418c33ecf30ed4ec883031fd0b5aec60648f70 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 11 Oct 2024 08:14:21 -0300 Subject: [PATCH 07/17] use jsoniter --- exporter/elasticsearchexporter/go.mod | 2 +- exporter/elasticsearchexporter/model.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index bd98fab282f8..292a576bf9e3 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -7,6 +7,7 @@ require ( github.com/elastic/go-docappender/v2 v2.3.0 github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-structform v0.0.12 + github.com/json-iterator/go v1.1.12 github.com/lestrrat-go/strftime v1.1.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.110.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.110.0 @@ -44,7 +45,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.10 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index 21183ecc49ce..c29f03c1e89b 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -15,6 +15,7 @@ import ( "slices" "time" + jsoniter "github.com/json-iterator/go" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -146,7 +147,7 @@ func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error return []byte{}, errors.New("record body is not a map") } - return json.Marshal(body.Map().AsRaw()) + return jsoniter.Marshal(body.Map().AsRaw()) } func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document { From c509a208b915661cd0ce751c5c8c3c81b0fb914a Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Mon, 14 Oct 2024 08:49:03 -0300 Subject: [PATCH 08/17] fix typo --- exporter/elasticsearchexporter/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 085431811120..d5b7850e690e 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -155,7 +155,7 @@ behaviours, which may be configured through the following settings: - `none`: Use original fields and event structure from the OTLP event. - `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS] - `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event. - - :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes. + - :warning: This mode's behavior is unstable, it is currently experimental and undergoing changes. - There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields. - `data_stream.dataset` will always be appended with `.otel`. It is recommended to use with `*_dynamic_index.enabled: true` to route documents to data stream `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`. - Span events are stored in separate documents. They will be routed with `data_stream.type` set to `logs` if `traces_dynamic_index::enabled` is `true`. @@ -164,7 +164,7 @@ behaviours, which may be configured through the following settings: span attributes as well as omit the `Events.` string prefixed to field names for span events. - `bodymap`: Provides fine-grained control over the final documents to be ingested. - :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes. + :warning: This mode's behavior is unstable, it is currently experimental and undergoing changes. It works only for logs where the log record body is a map. Each LogRecord body is serialized to JSON as-is and becomes a separate document for ingestion. - `dedup` (DEPRECATED). This configuration is deprecated and non-operational, From 44bb7dae77f28d46d2a2c1f94d163b82ffb5e06b Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Mon, 14 Oct 2024 09:58:17 -0300 Subject: [PATCH 09/17] do not return error if body is not a map --- .../elasticsearchexporter/exporter_test.go | 59 +++++++++++-------- exporter/elasticsearchexporter/model.go | 7 ++- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index ecad417cfe69..987386e4a115 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -85,36 +85,47 @@ func TestExporterLogs(t *testing.T) { t.Run("publish with bodymap encoding", func(t *testing.T) { tableTests := []struct { name string - body func() pcommon.Map + body func() pcommon.Value expected string }{ + { + name: "body not a map", + body: func() pcommon.Value { + body := pcommon.NewValueInt(42) + return body + }, + expected: `{}`, + }, { name: "flat", - body: func() pcommon.Map { - body := pcommon.NewMap() - body.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z") - body.PutInt("id", 1) - body.PutStr("key", "value") + body: func() pcommon.Value { + body := pcommon.NewValueMap() + m := body.Map() + m.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z") + m.PutInt("id", 1) + m.PutStr("key", "value") return body }, expected: `{"@timestamp":"2024-03-12T20:00:41.123456789Z","id":1,"key":"value"}`, }, { name: "dotted key", - body: func() pcommon.Map { - body := pcommon.NewMap() - body.PutInt("a", 1) - body.PutInt("a.b", 2) - body.PutInt("a.b.c", 3) + body: func() pcommon.Value { + body := pcommon.NewValueMap() + m := body.Map() + m.PutInt("a", 1) + m.PutInt("a.b", 2) + m.PutInt("a.b.c", 3) return body }, expected: `{"a":1,"a.b":2,"a.b.c":3}`, }, { name: "slice", - body: func() pcommon.Map { - body := pcommon.NewMap() - s := body.PutEmptySlice("a") + body: func() pcommon.Value { + body := pcommon.NewValueMap() + m := body.Map() + s := m.PutEmptySlice("a") for i := 0; i < 2; i++ { s.AppendEmpty().SetInt(int64(i)) } @@ -124,20 +135,22 @@ func TestExporterLogs(t *testing.T) { }, { name: "inner map", - body: func() pcommon.Map { - body := pcommon.NewMap() - m := body.PutEmptyMap("a") - m.PutInt("b", 1) - m.PutInt("c", 2) + body: func() pcommon.Value { + body := pcommon.NewValueMap() + m := body.Map() + m1 := m.PutEmptyMap("a") + m1.PutInt("b", 1) + m1.PutInt("c", 2) return body }, expected: `{"a":{"b":1,"c":2}}`, }, { name: "nested map", - body: func() pcommon.Map { - body := pcommon.NewMap() - m1 := body.PutEmptyMap("a") + body: func() pcommon.Value { + body := pcommon.NewValueMap() + m := body.Map() + m1 := m.PutEmptyMap("a") m2 := m1.PutEmptyMap("b") m2.PutInt("c", 1) m2.PutInt("d", 2) @@ -164,7 +177,7 @@ func TestExporterLogs(t *testing.T) { scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() logRecords := scopeLogs.LogRecords() logRecord := logRecords.AppendEmpty() - tt.body().CopyTo(logRecord.Body().SetEmptyMap()) + tt.body().CopyTo(logRecord.Body()) mustSendLogs(t, exporter, logs) rec.WaitItems(1) diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index c29f03c1e89b..b1d3c8d279cc 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -143,11 +143,12 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error) { body := record.Body() - if body.Type() != pcommon.ValueTypeMap { - return []byte{}, errors.New("record body is not a map") + bm := pcommon.NewMap() + if body.Type() == pcommon.ValueTypeMap { + bm = body.Map() } - return jsoniter.Marshal(body.Map().AsRaw()) + return jsoniter.Marshal(bm.AsRaw()) } func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document { From a0d8c58ba81d259531ba7e07990f9c81b8bf0e91 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Mon, 14 Oct 2024 10:04:42 -0300 Subject: [PATCH 10/17] add doc for not a map case --- exporter/elasticsearchexporter/README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index d5b7850e690e..b22d52ed6202 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -167,6 +167,7 @@ behaviours, which may be configured through the following settings: :warning: This mode's behavior is unstable, it is currently experimental and undergoing changes. It works only for logs where the log record body is a map. Each LogRecord body is serialized to JSON as-is and becomes a separate document for ingestion. + If the log record body is not a map, it results in an empty document. - `dedup` (DEPRECATED). This configuration is deprecated and non-operational, and will be removed in the future. Object keys are always deduplicated to avoid Elasticsearch rejecting documents. From 0e5f332a8be313bad68340c3d3ff69592f519d15 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 15 Oct 2024 08:13:10 -0300 Subject: [PATCH 11/17] Update exporter/elasticsearchexporter/README.md Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index b22d52ed6202..0c95d4babac9 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -167,7 +167,7 @@ behaviours, which may be configured through the following settings: :warning: This mode's behavior is unstable, it is currently experimental and undergoing changes. It works only for logs where the log record body is a map. Each LogRecord body is serialized to JSON as-is and becomes a separate document for ingestion. - If the log record body is not a map, it results in an empty document. + If the log record body is not a map, the exporter will log a warning and drop the log record. - `dedup` (DEPRECATED). This configuration is deprecated and non-operational, and will be removed in the future. Object keys are always deduplicated to avoid Elasticsearch rejecting documents. From a71f1b0d7895552191fc0c7996af34b6c40bec8c Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 15 Oct 2024 08:54:37 -0300 Subject: [PATCH 12/17] drop log records whose body is not a map, add tests --- exporter/elasticsearchexporter/exporter.go | 4 +++ .../elasticsearchexporter/exporter_test.go | 34 +++++++++++++++++++ exporter/elasticsearchexporter/model.go | 9 ++--- exporter/elasticsearchexporter/model_test.go | 6 ++++ 4 files changed, 49 insertions(+), 4 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 17d2dc8578f0..1ab09c680488 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -130,6 +130,10 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) return cerr } + if errors.Is(err, ErrInvalidTypeForBodyMapMode) { + errs = append(errs, fmt.Errorf("dropping log record: %w", err)) + } + errs = append(errs, err) } } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 987386e4a115..7b9b410cd41d 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -185,6 +185,40 @@ func TestExporterLogs(t *testing.T) { } }) + t.Run("drops log records for bodymap mode if body is not a map", func(t *testing.T) { + logs := plog.NewLogs() + resourceLogs := logs.ResourceLogs().AppendEmpty() + scopeLogs := resourceLogs.ScopeLogs().AppendEmpty() + logRecords := scopeLogs.LogRecords() + + // Invalid body type should be dropped. + logRecords.AppendEmpty().Body().SetEmptySlice() + + // We should still process the valid records in the batch. + bodyMap := logRecords.AppendEmpty().Body().SetEmptyMap() + bodyMap.PutInt("a", 42) + + rec := newBulkRecorder() + var done atomic.Bool + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + defer done.Store(true) + rec.Record(docs) + assert.Len(t, docs, 1) + assert.JSONEq(t, `{"a":42}`, string(docs[0].Document)) + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "bodymap" + }) + + err := exporter.ConsumeLogs(context.Background(), logs) + assert.ErrorContains(t, err, `dropping log record: failed to encode log event: invalid log record body type for 'bodymap' mapping mode: "Slice"`) + assert.Eventually(t, func() bool { + return done.Load() + }, 100*time.Millisecond, time.Millisecond, "timeout waiting for valid log to be processed") + }) + t.Run("publish with dedot", func(t *testing.T) { rec := newBulkRecorder() server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { diff --git a/exporter/elasticsearchexporter/model.go b/exporter/elasticsearchexporter/model.go index b1d3c8d279cc..c6b91afe5885 100644 --- a/exporter/elasticsearchexporter/model.go +++ b/exporter/elasticsearchexporter/model.go @@ -66,6 +66,8 @@ var resourceAttrsToPreserve = map[string]bool{ semconv.AttributeHostName: true, } +var ErrInvalidTypeForBodyMapMode = errors.New("invalid log record body type for 'bodymap' mapping mode") + type mappingModel interface { encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error) encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error) @@ -143,12 +145,11 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error) { body := record.Body() - bm := pcommon.NewMap() - if body.Type() == pcommon.ValueTypeMap { - bm = body.Map() + if body.Type() != pcommon.ValueTypeMap { + return nil, fmt.Errorf("%w: %q", ErrInvalidTypeForBodyMapMode, body.Type()) } - return jsoniter.Marshal(bm.AsRaw()) + return jsoniter.Marshal(body.Map().AsRaw()) } func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document { diff --git a/exporter/elasticsearchexporter/model_test.go b/exporter/elasticsearchexporter/model_test.go index 384b14be987d..05c052d8e625 100644 --- a/exporter/elasticsearchexporter/model_test.go +++ b/exporter/elasticsearchexporter/model_test.go @@ -1241,4 +1241,10 @@ func TestEncodeLogBodyMapMode(t *testing.T) { "key.a.b": "b", "pi": 3.14 }`, string(got)) + + // invalid body map + logRecord.Body().SetEmptySlice() + _, err = m.encodeLogBodyMapMode(logRecord) + require.Error(t, err) + require.ErrorIs(t, err, ErrInvalidTypeForBodyMapMode) } From 2e66c7898208b25cb7899dad608f85e75f563f78 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 15 Oct 2024 08:57:35 -0300 Subject: [PATCH 13/17] remove test for invalid body type, covered by another test case --- exporter/elasticsearchexporter/exporter_test.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 7b9b410cd41d..df2730d81f4f 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -88,14 +88,6 @@ func TestExporterLogs(t *testing.T) { body func() pcommon.Value expected string }{ - { - name: "body not a map", - body: func() pcommon.Value { - body := pcommon.NewValueInt(42) - return body - }, - expected: `{}`, - }, { name: "flat", body: func() pcommon.Value { From 46b00e931dc92be1d924c7dd27b198255a1589b6 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 15 Oct 2024 09:11:34 -0300 Subject: [PATCH 14/17] continue --- exporter/elasticsearchexporter/exporter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 1ab09c680488..1e2e6ba1023c 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -132,6 +132,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) if errors.Is(err, ErrInvalidTypeForBodyMapMode) { errs = append(errs, fmt.Errorf("dropping log record: %w", err)) + continue } errs = append(errs, err) From 2ffef388d8dc98f167e50e35328e852ca0556b06 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 15 Oct 2024 09:56:29 -0300 Subject: [PATCH 15/17] remove Eventually to make the unlambda linter happy --- exporter/elasticsearchexporter/exporter_test.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index df2730d81f4f..45662bd77676 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -191,9 +191,9 @@ func TestExporterLogs(t *testing.T) { bodyMap.PutInt("a", 42) rec := newBulkRecorder() - var done atomic.Bool + done := make(chan struct{}, 1) server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - defer done.Store(true) + defer close(done) rec.Record(docs) assert.Len(t, docs, 1) assert.JSONEq(t, `{"a":42}`, string(docs[0].Document)) @@ -206,9 +206,12 @@ func TestExporterLogs(t *testing.T) { err := exporter.ConsumeLogs(context.Background(), logs) assert.ErrorContains(t, err, `dropping log record: failed to encode log event: invalid log record body type for 'bodymap' mapping mode: "Slice"`) - assert.Eventually(t, func() bool { - return done.Load() - }, 100*time.Millisecond, time.Millisecond, "timeout waiting for valid log to be processed") + + select { + case <-done: + case <-time.After(100 * time.Millisecond): + t.Error("timeout waiting for valid log to be processed") + } }) t.Run("publish with dedot", func(t *testing.T) { From 5a032ec21a7d280803d7c952a0339409525d5ac3 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 15 Oct 2024 10:29:15 -0300 Subject: [PATCH 16/17] use WaitItems instead of channel --- exporter/elasticsearchexporter/exporter_test.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 45662bd77676..cf23530f0385 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -191,10 +191,8 @@ func TestExporterLogs(t *testing.T) { bodyMap.PutInt("a", 42) rec := newBulkRecorder() - done := make(chan struct{}, 1) server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - defer close(done) - rec.Record(docs) + defer rec.Record(docs) assert.Len(t, docs, 1) assert.JSONEq(t, `{"a":42}`, string(docs[0].Document)) return itemsAllOK(docs) @@ -206,12 +204,7 @@ func TestExporterLogs(t *testing.T) { err := exporter.ConsumeLogs(context.Background(), logs) assert.ErrorContains(t, err, `dropping log record: failed to encode log event: invalid log record body type for 'bodymap' mapping mode: "Slice"`) - - select { - case <-done: - case <-time.After(100 * time.Millisecond): - t.Error("timeout waiting for valid log to be processed") - } + rec.WaitItems(1) }) t.Run("publish with dedot", func(t *testing.T) { From eb13f374bb98c0f0389da41276203a88deae35a7 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 15 Oct 2024 10:59:15 -0300 Subject: [PATCH 17/17] log error --- exporter/elasticsearchexporter/exporter.go | 3 ++- exporter/elasticsearchexporter/exporter_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 1e2e6ba1023c..da50b7e6d9b0 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" ) @@ -131,7 +132,7 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) } if errors.Is(err, ErrInvalidTypeForBodyMapMode) { - errs = append(errs, fmt.Errorf("dropping log record: %w", err)) + e.Logger.Warn("dropping log record", zap.Error(err)) continue } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index cf23530f0385..ece77d95fe3e 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -203,7 +203,7 @@ func TestExporterLogs(t *testing.T) { }) err := exporter.ConsumeLogs(context.Background(), logs) - assert.ErrorContains(t, err, `dropping log record: failed to encode log event: invalid log record body type for 'bodymap' mapping mode: "Slice"`) + assert.NoError(t, err) rec.WaitItems(1) })