Skip to content

Commit

Permalink
[receiver/sqlquery] set ObservedTimestamp on collected logs (#23777)
Browse files Browse the repository at this point in the history
Fixes #23776
  • Loading branch information
andrzej-stencel authored Aug 14, 2023
1 parent c36d836 commit 3b1ab22
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 1 deletion.
20 changes: 20 additions & 0 deletions .chloggen/sqlquery-observedtimestamp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.
# If your change doesn't affect end users, such as a test fix or a tooling change,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/sqlquery

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Set ObservedTimestamp on collected logs

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [23776]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
6 changes: 5 additions & 1 deletion receiver/sqlqueryreceiver/logs_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/multierr"
Expand Down Expand Up @@ -270,6 +271,7 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs,

var rows []stringMap
var err error
observedAt := pcommon.NewTimestampFromTime(time.Now())
if queryReceiver.query.TrackingColumn != "" {
rows, err = queryReceiver.client.queryRows(ctx, queryReceiver.trackingValue)
} else {
Expand All @@ -283,7 +285,9 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs,
scopeLogs := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()
for logsConfigIndex, logsConfig := range queryReceiver.query.Logs {
for _, row := range rows {
rowToLog(row, logsConfig, scopeLogs.AppendEmpty())
logRecord := scopeLogs.AppendEmpty()
rowToLog(row, logsConfig, logRecord)
logRecord.SetObservedTimestamp(observedAt)
if logsConfigIndex == 0 {
errs = multierr.Append(errs, queryReceiver.storeTrackingValue(ctx, row))
}
Expand Down
51 changes: 51 additions & 0 deletions receiver/sqlqueryreceiver/logs_receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sqlqueryreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver"

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
)

func TestLogsQueryReceiver_Collect(t *testing.T) {
now := time.Now()

fakeClient := &fakeDBClient{
stringMaps: [][]stringMap{
{{"col1": "42"}, {"col1": "63"}},
},
}
queryReceiver := logsQueryReceiver{
client: fakeClient,
query: Query{
Logs: []LogsCfg{
{
BodyColumn: "col1",
},
},
},
}
logs, err := queryReceiver.collect(context.Background())
assert.NoError(t, err)
assert.NotNil(t, logs)
assert.Equal(t, 2, logs.LogRecordCount())

logRecord := logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
assert.Equal(t, "42", logRecord.Body().Str())
assert.GreaterOrEqual(t, logRecord.ObservedTimestamp(), pcommon.NewTimestampFromTime(now))

logRecord = logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1)
assert.Equal(t, "63", logRecord.Body().Str())
assert.NotEqual(t, logRecord.ObservedTimestamp(), pcommon.NewTimestampFromTime(now))

assert.Equal(t,
logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).ObservedTimestamp(),
logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).ObservedTimestamp(),
"Observed timestamps of all log records collected in a single scrape should be equal",
)
}

0 comments on commit 3b1ab22

Please sign in to comment.