Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Add mapping mode bodymap #35637

Merged
merged 19 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feature_elasticsearch_mapping_bodymap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introduce an experimental bodymap mapping mode for logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35444]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
9 changes: 7 additions & 2 deletions exporter/elasticsearchexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,19 @@ behaviours, which may be configured through the following settings:
- `none`: Use original fields and event structure from the OTLP event.
- `ecs`: Try to map fields to [Elastic Common Schema (ECS)][ECS]
- `otel`: Elastic's preferred "OTel-native" mapping mode. Uses original fields and event structure from the OTLP event.
- :warning: This mode's behavior is unstable, it is currently is experimental and undergoing changes.
- :warning: This mode's behavior is unstable, it is currently experimental and undergoing changes.
- There's a special treatment for the following attributes: `data_stream.type`, `data_stream.dataset`, `data_stream.namespace`. Instead of serializing these values under the `*attributes.*` namespace, they're put at the root of the document, to conform with the conventions of the data stream naming scheme that maps these as `constant_keyword` fields.
- `data_stream.dataset` will always be appended with `.otel`. It is recommended to use with `*_dynamic_index.enabled: true` to route documents to data stream `${data_stream.type}-${data_stream.dataset}-${data_stream.namespace}`.
- Span events are stored in separate documents. They will be routed with `data_stream.type` set to `logs` if `traces_dynamic_index::enabled` is `true`.

- `raw`: Omit the `Attributes.` string prefixed to field names for log and
span attributes as well as omit the `Events.` string prefixed to
field names for span events.
field names for span events.
- `bodymap`: Provides fine-grained control over the final documents to be ingested.
:warning: This mode's behavior is unstable, it is currently experimental and undergoing changes.
It works only for logs where the log record body is a map. Each LogRecord
body is serialized to JSON as-is and becomes a separate document for ingestion.
If the log record body is not a map, the exporter will log a warning and drop the log record.
- `dedup` (DEPRECATED). This configuration is deprecated and non-operational,
and will be removed in the future. Object keys are always deduplicated to
avoid Elasticsearch rejecting documents.
Expand Down
4 changes: 4 additions & 0 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ const (
MappingECS
MappingOTel
MappingRaw
MappingBodyMap
)

var (
Expand All @@ -224,6 +225,8 @@ func (m MappingMode) String() string {
return "otel"
case MappingRaw:
return "raw"
case MappingBodyMap:
return "bodymap"
mauri870 marked this conversation as resolved.
Show resolved Hide resolved
default:
return ""
}
Expand All @@ -236,6 +239,7 @@ var mappingModes = func() map[string]MappingMode {
MappingECS,
MappingOTel,
MappingRaw,
MappingBodyMap,
} {
table[strings.ToLower(m.String())] = m
}
Expand Down
5 changes: 5 additions & 0 deletions exporter/elasticsearchexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs)
return cerr
}

if errors.Is(err, ErrInvalidTypeForBodyMapMode) {
e.Logger.Warn("dropping log record", zap.Error(err))
continue
}

errs = append(errs, err)
}
}
Expand Down
125 changes: 125 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,131 @@ func TestExporterLogs(t *testing.T) {
rec.WaitItems(1)
})

t.Run("publish with bodymap encoding", func(t *testing.T) {
tableTests := []struct {
name string
body func() pcommon.Value
expected string
}{
{
name: "flat",
body: func() pcommon.Value {
body := pcommon.NewValueMap()
m := body.Map()
m.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z")
m.PutInt("id", 1)
m.PutStr("key", "value")
return body
},
expected: `{"@timestamp":"2024-03-12T20:00:41.123456789Z","id":1,"key":"value"}`,
},
{
name: "dotted key",
body: func() pcommon.Value {
body := pcommon.NewValueMap()
m := body.Map()
m.PutInt("a", 1)
m.PutInt("a.b", 2)
m.PutInt("a.b.c", 3)
return body
},
expected: `{"a":1,"a.b":2,"a.b.c":3}`,
},
{
name: "slice",
body: func() pcommon.Value {
body := pcommon.NewValueMap()
m := body.Map()
s := m.PutEmptySlice("a")
for i := 0; i < 2; i++ {
s.AppendEmpty().SetInt(int64(i))
}
return body
},
expected: `{"a":[0,1]}`,
},
{
name: "inner map",
body: func() pcommon.Value {
body := pcommon.NewValueMap()
m := body.Map()
m1 := m.PutEmptyMap("a")
m1.PutInt("b", 1)
m1.PutInt("c", 2)
return body
},
expected: `{"a":{"b":1,"c":2}}`,
},
{
name: "nested map",
body: func() pcommon.Value {
body := pcommon.NewValueMap()
m := body.Map()
m1 := m.PutEmptyMap("a")
m2 := m1.PutEmptyMap("b")
m2.PutInt("c", 1)
m2.PutInt("d", 2)
return body
},
expected: `{"a":{"b":{"c":1,"d":2}}}`,
},
}

for _, tt := range tableTests {
t.Run(tt.name, func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
assert.JSONEq(t, tt.expected, string(docs[0].Document))
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "bodymap"
})
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
logRecords := scopeLogs.LogRecords()
logRecord := logRecords.AppendEmpty()
tt.body().CopyTo(logRecord.Body())

mustSendLogs(t, exporter, logs)
rec.WaitItems(1)
})
}
})

t.Run("drops log records for bodymap mode if body is not a map", func(t *testing.T) {
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
logRecords := scopeLogs.LogRecords()

// Invalid body type should be dropped.
logRecords.AppendEmpty().Body().SetEmptySlice()

// We should still process the valid records in the batch.
bodyMap := logRecords.AppendEmpty().Body().SetEmptyMap()
bodyMap.PutInt("a", 42)

rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
defer rec.Record(docs)
assert.Len(t, docs, 1)
assert.JSONEq(t, `{"a":42}`, string(docs[0].Document))
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "bodymap"
})

err := exporter.ConsumeLogs(context.Background(), logs)
assert.NoError(t, err)
rec.WaitItems(1)
})

t.Run("publish with dedot", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
Expand Down
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/elastic/go-docappender/v2 v2.3.0
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/elastic/go-structform v0.0.12
github.com/json-iterator/go v1.1.12
github.com/lestrrat-go/strftime v1.1.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0
Expand Down Expand Up @@ -43,7 +44,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
Expand Down
14 changes: 14 additions & 0 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"slices"
"time"

jsoniter "github.com/json-iterator/go"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -65,6 +66,8 @@ var resourceAttrsToPreserve = map[string]bool{
semconv.AttributeHostName: true,
}

var ErrInvalidTypeForBodyMapMode = errors.New("invalid log record body type for 'bodymap' mapping mode")

type mappingModel interface {
encodeLog(pcommon.Resource, string, plog.LogRecord, pcommon.InstrumentationScope, string) ([]byte, error)
encodeSpan(pcommon.Resource, string, ptrace.Span, pcommon.InstrumentationScope, string) ([]byte, error)
Expand Down Expand Up @@ -107,6 +110,8 @@ func (m *encodeModel) encodeLog(resource pcommon.Resource, resourceSchemaURL str
document = m.encodeLogECSMode(resource, record, scope)
case MappingOTel:
document = m.encodeLogOTelMode(resource, resourceSchemaURL, record, scope, scopeSchemaURL)
case MappingBodyMap:
return m.encodeLogBodyMapMode(record)
default:
document = m.encodeLogDefaultMode(resource, record, scope)
}
Expand Down Expand Up @@ -138,6 +143,15 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo
return document
}

func (m *encodeModel) encodeLogBodyMapMode(record plog.LogRecord) ([]byte, error) {
body := record.Body()
if body.Type() != pcommon.ValueTypeMap {
return nil, fmt.Errorf("%w: %q", ErrInvalidTypeForBodyMapMode, body.Type())
}

return jsoniter.Marshal(body.Map().AsRaw())
}

func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchemaURL string, record plog.LogRecord, scope pcommon.InstrumentationScope, scopeSchemaURL string) objmodel.Document {
var document objmodel.Document

Expand Down
40 changes: 40 additions & 0 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,3 +1225,43 @@ func TestEncodeLogScalarObjectConflict(t *testing.T) {
fooValue = gjson.GetBytes(encoded, "Attributes\\.foo\\.value")
assert.Equal(t, "foovalue", fooValue.Str)
}

func TestEncodeLogBodyMapMode(t *testing.T) {
// craft a log record with a body map
logs := plog.NewLogs()
resourceLogs := logs.ResourceLogs().AppendEmpty()
scopeLogs := resourceLogs.ScopeLogs().AppendEmpty()
logRecords := scopeLogs.LogRecords()
observedTimestamp := pcommon.Timestamp(time.Now().UnixNano())

logRecord := logRecords.AppendEmpty()
logRecord.SetObservedTimestamp(observedTimestamp)

bodyMap := pcommon.NewMap()
bodyMap.PutStr("@timestamp", "2024-03-12T20:00:41.123456789Z")
bodyMap.PutInt("id", 1)
bodyMap.PutStr("key", "value")
bodyMap.PutStr("key.a", "a")
bodyMap.PutStr("key.a.b", "b")
bodyMap.PutDouble("pi", 3.14)
bodyMap.CopyTo(logRecord.Body().SetEmptyMap())

m := encodeModel{}
got, err := m.encodeLogBodyMapMode(logRecord)
require.NoError(t, err)

require.JSONEq(t, `{
"@timestamp": "2024-03-12T20:00:41.123456789Z",
"id": 1,
"key": "value",
"key.a": "a",
"key.a.b": "b",
"pi": 3.14
}`, string(got))

// invalid body map
logRecord.Body().SetEmptySlice()
_, err = m.encodeLogBodyMapMode(logRecord)
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidTypeForBodyMapMode)
}