Skip to content

Commit

Permalink
Use done channel for shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
crobert-1 committed Apr 3, 2024
1 parent a930320 commit 7233036
Show file tree
Hide file tree
Showing 17 changed files with 92 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestSerializeMetric(t *testing.T) {
gaugeDp := metric.SetEmptyGauge().DataPoints().AppendEmpty()
gaugeDp.SetIntValue(3)

prev := ttlmap.New(1, 1)
prev := ttlmap.New(1, 1, make(chan struct{}))

serialized, err := SerializeMetric(logger, "prefix", metric, defaultDims, staticDims, prev)
assert.NoError(t, err)
Expand All @@ -50,7 +50,7 @@ func TestSerializeMetric(t *testing.T) {
sumDp := sum.DataPoints().AppendEmpty()
sumDp.SetIntValue(4)

prev := ttlmap.New(1, 1)
prev := ttlmap.New(1, 1, make(chan struct{}))

serialized, err := SerializeMetric(logger, "prefix", metric, defaultDims, staticDims, prev)
assert.NoError(t, err)
Expand All @@ -71,7 +71,7 @@ func TestSerializeMetric(t *testing.T) {
dp.SetSum(6)
dp.SetCount(3)

prev := ttlmap.New(1, 1)
prev := ttlmap.New(1, 1, make(chan struct{}))

serialized, err := SerializeMetric(logger, "prefix", metric, defaultDims, staticDims, prev)
assert.NoError(t, err)
Expand Down
32 changes: 16 additions & 16 deletions exporter/dynatraceexporter/internal/serialization/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func Test_serializeSumPoint(t *testing.T) {
dp := pmetric.NewNumberDataPoint()
dp.SetIntValue(5)

got, err := serializeSumPoint("int_sum", "prefix", dimensions.NewNormalizedDimensionList(), pmetric.AggregationTemporalityDelta, dp, ttlmap.New(1, 1))
got, err := serializeSumPoint("int_sum", "prefix", dimensions.NewNormalizedDimensionList(), pmetric.AggregationTemporalityDelta, dp, ttlmap.New(1, 1, make(chan struct{})))
assert.NoError(t, err)
assert.Equal(t, "prefix.int_sum count,delta=5", got)
})
Expand All @@ -33,7 +33,7 @@ func Test_serializeSumPoint(t *testing.T) {
dp.SetDoubleValue(5.5)
dp.SetTimestamp(pcommon.Timestamp(time.Date(2021, 07, 16, 12, 30, 0, 0, time.UTC).UnixNano()))

prev := ttlmap.New(1, 1)
prev := ttlmap.New(1, 1, make(chan struct{}))

got, err := serializeSumPoint("double_sum", "prefix", dimensions.NewNormalizedDimensionList(dimensions.NewDimension("key", "value")), pmetric.AggregationTemporalityDelta, dp, prev)
assert.NoError(t, err)
Expand All @@ -45,7 +45,7 @@ func Test_serializeSumPoint(t *testing.T) {
dp.SetIntValue(5)
dp.SetTimestamp(pcommon.Timestamp(time.Date(2021, 07, 16, 12, 30, 0, 0, time.UTC).UnixNano()))

got, err := serializeSumPoint("int_sum", "prefix", dimensions.NewNormalizedDimensionList(dimensions.NewDimension("key", "value")), pmetric.AggregationTemporalityDelta, dp, ttlmap.New(1, 1))
got, err := serializeSumPoint("int_sum", "prefix", dimensions.NewNormalizedDimensionList(dimensions.NewDimension("key", "value")), pmetric.AggregationTemporalityDelta, dp, ttlmap.New(1, 1, make(chan struct{})))
assert.NoError(t, err)
assert.Equal(t, "prefix.int_sum,key=value count,delta=5 1626438600000", got)
})
Expand All @@ -59,7 +59,7 @@ func Test_serializeSumPoint(t *testing.T) {
dp2.SetDoubleValue(7.0)
dp2.SetTimestamp(pcommon.Timestamp(time.Date(2021, 07, 16, 12, 31, 0, 0, time.UTC).UnixNano()))

prev := ttlmap.New(1, 1)
prev := ttlmap.New(1, 1, make(chan struct{}))

got, err := serializeSumPoint("double_sum", "prefix", dimensions.NewNormalizedDimensionList(dimensions.NewDimension("key", "value")), pmetric.AggregationTemporalityCumulative, dp, prev)
assert.NoError(t, err)
Expand All @@ -79,7 +79,7 @@ func Test_serializeSumPoint(t *testing.T) {
dp2.SetIntValue(10)
dp2.SetTimestamp(pcommon.Timestamp(time.Date(2021, 07, 16, 12, 31, 0, 0, time.UTC).UnixNano()))

prev := ttlmap.New(1, 1)
prev := ttlmap.New(1, 1, make(chan struct{}))

got, err := serializeSumPoint("int_sum", "prefix", dimensions.NewNormalizedDimensionList(dimensions.NewDimension("key", "value")), pmetric.AggregationTemporalityCumulative, dp, prev)
assert.NoError(t, err)
Expand Down Expand Up @@ -115,7 +115,7 @@ func Test_serializeSumPoint(t *testing.T) {
dp4.Attributes().PutStr("sort", "unstable")
dp4.SetTimestamp(pcommon.Timestamp(time.Date(2021, 07, 16, 12, 30, 0, 0, time.UTC).UnixNano()))

prev := ttlmap.New(1, 1)
prev := ttlmap.New(1, 1, make(chan struct{}))

got, err := serializeSumPoint("int_sum", "prefix", dimensions.NewNormalizedDimensionList(dimensions.NewDimension("key", "a")), pmetric.AggregationTemporalityCumulative, dp, prev)
got2, err2 := serializeSumPoint("int_sum", "prefix", dimensions.NewNormalizedDimensionList(dimensions.NewDimension("key", "b")), pmetric.AggregationTemporalityCumulative, dp2, prev)
Expand All @@ -141,7 +141,7 @@ func Test_serializeSumPoint(t *testing.T) {
dp2.SetIntValue(5)
dp2.SetTimestamp(pcommon.Timestamp(time.Date(2021, 07, 16, 12, 29, 0, 0, time.UTC).UnixNano()))

prev := ttlmap.New(1, 1)
prev := ttlmap.New(1, 1, make(chan struct{}))

got, err := serializeSumPoint("int_sum", "prefix", dimensions.NewNormalizedDimensionList(dimensions.NewDimension("key", "value")), pmetric.AggregationTemporalityCumulative, dp, prev)
assert.NoError(t, err)
Expand All @@ -165,7 +165,7 @@ func Test_serializeSum(t *testing.T) {
sum := metric.SetEmptySum()
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
sum.SetIsMonotonic(false)
prev := ttlmap.New(10, 10)
prev := ttlmap.New(10, 10, make(chan struct{}))

zapCore, observedLogs := observer.New(zap.WarnLevel)
logger := zap.New(zapCore)
Expand Down Expand Up @@ -197,7 +197,7 @@ func Test_serializeSum(t *testing.T) {
// not checking Double, this is done in Test_serializeSumPoint
dp.SetIntValue(12)

prev := ttlmap.New(10, 10)
prev := ttlmap.New(10, 10, make(chan struct{}))

zapCore, observedLogs := observer.New(zap.WarnLevel)
logger := zap.New(zapCore)
Expand All @@ -214,7 +214,7 @@ func Test_serializeSum(t *testing.T) {
t.Run("with invalid value logs warning and returns no line", func(t *testing.T) {
dp.SetDoubleValue(math.NaN())

prev := ttlmap.New(10, 10)
prev := ttlmap.New(10, 10, make(chan struct{}))

zapCore, observedLogs := observer.New(zap.WarnLevel)
logger := zap.New(zapCore)
Expand Down Expand Up @@ -250,7 +250,7 @@ func Test_serializeSum(t *testing.T) {
// not checking Int here, this is done in Test_serializeSumPoint
dp.SetDoubleValue(12.3)

prev := ttlmap.New(10, 10)
prev := ttlmap.New(10, 10, make(chan struct{}))

zapCore, observedLogs := observer.New(zap.WarnLevel)
logger := zap.New(zapCore)
Expand All @@ -269,7 +269,7 @@ func Test_serializeSum(t *testing.T) {
t.Run("with invalid value logs warning and returns no line", func(t *testing.T) {
dp.SetDoubleValue(math.NaN())

prev := ttlmap.New(10, 10)
prev := ttlmap.New(10, 10, make(chan struct{}))

zapCore, observedLogs := observer.New(zap.WarnLevel)
logger := zap.New(zapCore)
Expand Down Expand Up @@ -310,7 +310,7 @@ func Test_serializeSum(t *testing.T) {
dp1.SetDoubleValue(5.2)
dp2.SetDoubleValue(5.7)

prev := ttlmap.New(10, 10)
prev := ttlmap.New(10, 10, make(chan struct{}))

zapCore, observedLogs := observer.New(zap.WarnLevel)
logger := zap.New(zapCore)
Expand All @@ -330,7 +330,7 @@ func Test_serializeSum(t *testing.T) {
dp1.SetDoubleValue(5.2)
dp2.SetDoubleValue(math.NaN())

prev := ttlmap.New(10, 10)
prev := ttlmap.New(10, 10, make(chan struct{}))

zapCore, observedLogs := observer.New(zap.WarnLevel)
logger := zap.New(zapCore)
Expand All @@ -357,7 +357,7 @@ func Test_serializeSum(t *testing.T) {
dp1.SetDoubleValue(5.2)
dp2.SetIntValue(5)

prev := ttlmap.New(10, 10)
prev := ttlmap.New(10, 10, make(chan struct{}))

zapCore, observedLogs := observer.New(zap.WarnLevel)
logger := zap.New(zapCore)
Expand Down Expand Up @@ -388,7 +388,7 @@ func Test_convertTotalCounterToDelta_notMutating(t *testing.T) {
dp.Attributes().PutStr("attr1", "val1")
orig := pmetric.NewNumberDataPoint()
dp.CopyTo(orig)
_, err := convertTotalCounterToDelta("m", "prefix", dimensions.NormalizedDimensionList{}, dp, ttlmap.New(1, 1))
_, err := convertTotalCounterToDelta("m", "prefix", dimensions.NormalizedDimensionList{}, dp, ttlmap.New(1, 1, make(chan struct{})))
assert.NoError(t, err)
assert.Equal(t, orig, dp) // make sure the original data point is not mutated
}
2 changes: 1 addition & 1 deletion exporter/dynatraceexporter/metrics_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newMetricsExporter(params exporter.CreateSettings, cfg *config.Config) *met

staticDimensions := dimensions.NewNormalizedDimensionList(dimensions.NewDimension("dt.metrics.source", "opentelemetry"))

prevPts := ttlmap.New(cSweepIntervalSeconds, cMaxAgeSeconds)
prevPts := ttlmap.New(cSweepIntervalSeconds, cMaxAgeSeconds, make(chan struct{}))
prevPts.Start()

return &metricsExporter{
Expand Down
2 changes: 1 addition & 1 deletion exporter/dynatraceexporter/metrics_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func Test_SumMetrics(t *testing.T) {
// reset the export buffer for the HTTP client
sent = "nothing sent"

prevPts := ttlmap.New(cSweepIntervalSeconds, cMaxAgeSeconds)
prevPts := ttlmap.New(cSweepIntervalSeconds, cMaxAgeSeconds, make(chan struct{}))

// set up the exporter
exp := &metricsExporter{
Expand Down
4 changes: 2 additions & 2 deletions exporter/signalfxexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type DimensionClientConfig struct {
Timeout time.Duration `mapstructure:"timeout"`
}

func (cfg *Config) getMetricTranslator(logger *zap.Logger) (*translation.MetricTranslator, error) {
func (cfg *Config) getMetricTranslator(logger *zap.Logger, done chan struct{}) (*translation.MetricTranslator, error) {
rules := defaultTranslationRules
if cfg.TranslationRules != nil {
// Previous way to disable default translation rules.
Expand All @@ -166,7 +166,7 @@ func (cfg *Config) getMetricTranslator(logger *zap.Logger) (*translation.MetricT
if cfg.DisableDefaultTranslationRules {
rules = []translation.Rule{}
}
metricTranslator, err := translation.NewMetricTranslator(rules, cfg.DeltaTranslationTTL)
metricTranslator, err := translation.NewMetricTranslator(rules, cfg.DeltaTranslationTTL, done)
if err != nil {
return nil, fmt.Errorf("invalid \"%s\": %w", translationRulesConfigKey, err)
}
Expand Down
11 changes: 6 additions & 5 deletions exporter/signalfxexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func TestLoadConfig(t *testing.T) {
}

func TestConfigGetMetricTranslator(t *testing.T) {
done := make(chan struct{})
tests := []struct {
name string
cfg *Config
Expand All @@ -299,7 +300,7 @@ func TestConfigGetMetricTranslator(t *testing.T) {
DeltaTranslationTTL: 3600,
},
want: func() *translation.MetricTranslator {
translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600)
translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600, done)
require.NoError(t, err)
return translator
}(),
Expand All @@ -311,7 +312,7 @@ func TestConfigGetMetricTranslator(t *testing.T) {
DeltaTranslationTTL: 3600,
},
want: func() *translation.MetricTranslator {
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600)
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600, done)
require.NoError(t, err)
return translator
}(),
Expand All @@ -323,7 +324,7 @@ func TestConfigGetMetricTranslator(t *testing.T) {
DeltaTranslationTTL: 3600,
},
want: func() *translation.MetricTranslator {
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600)
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600, done)
require.NoError(t, err)
return translator
}(),
Expand All @@ -336,7 +337,7 @@ func TestConfigGetMetricTranslator(t *testing.T) {
DeltaTranslationTTL: 3600,
},
want: func() *translation.MetricTranslator {
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600)
translator, err := translation.NewMetricTranslator([]translation.Rule{}, 3600, done)
require.NoError(t, err)
return translator
}(),
Expand All @@ -358,7 +359,7 @@ func TestConfigGetMetricTranslator(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.cfg.getMetricTranslator(zap.NewNop())
got, err := tt.cfg.getMetricTranslator(zap.NewNop(), done)
if tt.wantErr {
assert.Error(t, err)
return
Expand Down
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func newSignalFxExporter(
return nil, errors.New("nil config")
}

metricTranslator, err := config.getMetricTranslator(createSettings.TelemetrySettings.Logger)
metricTranslator, err := config.getMetricTranslator(createSettings.TelemetrySettings.Logger, make(chan struct{}))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,7 @@ func TestTLSIngestConnection(t *testing.T) {
}

func TestDefaultSystemCPUTimeExcludedAndTranslated(t *testing.T) {
translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600)
translator, err := translation.NewMetricTranslator(defaultTranslationRules, 3600, make(chan struct{}))
require.NoError(t, err)
converter, err := translation.NewMetricsConverter(zap.NewNop(), translator, defaultExcludeMetrics, nil, "_-.", false, true)
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions exporter/signalfxexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestCreateMetricsExporter_CustomConfig(t *testing.T) {
func TestDefaultTranslationRules(t *testing.T) {
rules := defaultTranslationRules
require.NotNil(t, rules, "rules are nil")
tr, err := translation.NewMetricTranslator(rules, 1)
tr, err := translation.NewMetricTranslator(rules, 1, make(chan struct{}))
require.NoError(t, err)
data := testMetricsData(false)

Expand Down Expand Up @@ -475,7 +475,7 @@ func TestDefaultDiskTranslations(t *testing.T) {
func testGetTranslator(t *testing.T) *translation.MetricTranslator {
rules := defaultTranslationRules
require.NotNil(t, rules, "rules are nil")
tr, err := translation.NewMetricTranslator(rules, 3600)
tr, err := translation.NewMetricTranslator(rules, 3600, make(chan struct{}))
require.NoError(t, err)
return tr
}
Expand Down Expand Up @@ -610,7 +610,7 @@ func TestDefaultExcludes_not_translated(t *testing.T) {
func BenchmarkMetricConversion(b *testing.B) {
rules := defaultTranslationRules
require.NotNil(b, rules, "rules are nil")
tr, err := translation.NewMetricTranslator(rules, 1)
tr, err := translation.NewMetricTranslator(rules, 1, make(chan struct{}))
require.NoError(b, err)

c, err := translation.NewMetricsConverter(zap.NewNop(), tr, nil, nil, "", false, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestGetDimensionUpdateFromMetadata(t *testing.T) {
Action: translation.ActionRenameDimensionKeys,
Mapping: map[string]string{"name": "translated_name"},
},
}, 1)
}, 1, make(chan struct{}))
type args struct {
metadata metadata.MetadataUpdate
metricTranslator *translation.MetricTranslator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ func TestMetricDataToSignalFxV2WithTranslation(t *testing.T) {
"old.dim": "new.dim",
},
},
}, 1)
}, 1, make(chan struct{}))
require.NoError(t, err)

md := pmetric.NewMetrics()
Expand Down Expand Up @@ -1147,7 +1147,7 @@ func TestDimensionKeyCharsWithPeriod(t *testing.T) {
"old.dim.with.periods": "new.dim.with.periods",
},
},
}, 1)
}, 1, make(chan struct{}))
require.NoError(t, err)

md := pmetric.NewMetrics()
Expand Down Expand Up @@ -1344,7 +1344,7 @@ func TestMetricsConverter_ConvertDimension(t *testing.T) {
"d.i.m": "di.m",
},
},
}, 0)
}, 0, make(chan struct{}))
return t
}(),
nonAlphanumericDimChars: "_-",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ type deltaTranslator struct {
prevPts *ttlmap.TTLMap
}

func newDeltaTranslator(ttl int64) *deltaTranslator {
func newDeltaTranslator(ttl int64, done chan struct{}) *deltaTranslator {
sweepIntervalSeconds := ttl / 2
if sweepIntervalSeconds == 0 {
sweepIntervalSeconds = 1
}
m := ttlmap.New(sweepIntervalSeconds, ttl)
m := ttlmap.New(sweepIntervalSeconds, ttl, done)
m.Start()
return &deltaTranslator{prevPts: m}
}
Expand Down
4 changes: 2 additions & 2 deletions exporter/signalfxexporter/internal/translation/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ type MetricTranslator struct {
deltaTranslator *deltaTranslator
}

func NewMetricTranslator(rules []Rule, ttl int64) (*MetricTranslator, error) {
func NewMetricTranslator(rules []Rule, ttl int64, done chan struct{}) (*MetricTranslator, error) {
err := validateTranslationRules(rules)
if err != nil {
return nil, err
Expand All @@ -240,7 +240,7 @@ func NewMetricTranslator(rules []Rule, ttl int64) (*MetricTranslator, error) {
return &MetricTranslator{
rules: rules,
dimensionsMap: createDimensionsMap(rules),
deltaTranslator: newDeltaTranslator(ttl),
deltaTranslator: newDeltaTranslator(ttl, done),
}, nil
}

Expand Down
Loading

0 comments on commit 7233036

Please sign in to comment.