From ae6d36bc4001f539433e6b03fcff365d9489bf4d Mon Sep 17 00:00:00 2001 From: Garry Cairns <2401853+garry-cairns@users.noreply.github.com> Date: Thu, 2 Nov 2023 14:02:39 +0000 Subject: [PATCH] Adds duration sampler distinct from latency in supplying two bounds (#26115) **Description:** Adds a bounded duration sampling processor, distinct from the existing latency one in that it has both lower and upper bounds Apologies for this appearing as a pull request out of nothing, my intent had actually been to create a review area against my own fork and raise an issue asking if you'd accept the PR. I think the need here is pretty obvious from the context, though I think it's easy to imagine preferring this to be a change to the existing processor. I raised as a new one as I thought it might make existing behavior cleaner to retain. **Link to tracking Issue:** As above this is a bit of a premature PR since I intended to raise as an issue, and thus there isn't one, but I think it's easy enough to deal with here so leaving open for now and have learned GitHub's ways for the future (I rarely use github). **Testing:** New module so associated tests are added showing all relevant behavior, and passing. **Documentation:** Updated README and example config --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .chloggen/bounded_duration_sampler.yaml | 27 ++++++ .gitignore | 1 + processor/tailsamplingprocessor/README.md | 4 +- .../tailsamplingprocessor/and_helper_test.go | 2 +- .../composite_helper_test.go | 4 +- processor/tailsamplingprocessor/config.go | 6 +- .../internal/sampling/latency.go | 17 ++-- .../internal/sampling/latency_test.go | 89 ++++++++++++++++++- processor/tailsamplingprocessor/processor.go | 2 +- 9 files changed, 138 insertions(+), 14 deletions(-) create mode 100644 .chloggen/bounded_duration_sampler.yaml diff --git a/.chloggen/bounded_duration_sampler.yaml b/.chloggen/bounded_duration_sampler.yaml new file mode 100644 index 000000000000..1b2bb15c62bd --- /dev/null +++ b/.chloggen/bounded_duration_sampler.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: processor/tailsampling + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: adds optional upper bound duration for sampling + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26115] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.gitignore b/.gitignore index fd937a38f1b4..52616038f614 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,4 @@ integration-coverage.html go.work* +/result diff --git a/processor/tailsamplingprocessor/README.md b/processor/tailsamplingprocessor/README.md index c589e17d4ab5..e5bc9953e73a 100644 --- a/processor/tailsamplingprocessor/README.md +++ b/processor/tailsamplingprocessor/README.md @@ -29,7 +29,7 @@ The following configuration options are required: Multiple policies exist today and it is straight forward to add more. These include: - `always_sample`: Sample all traces -- `latency`: Sample based on the duration of the trace. The duration is determined by looking at the earliest start time and latest end time, without taking into consideration what happened in between. +- `latency`: Sample based on the duration of the trace. The duration is determined by looking at the earliest start time and latest end time, without taking into consideration what happened in between. Supplying no upper bound will result in a policy sampling anything greater than `threshold_ms`. - `numeric_attribute`: Sample based on number attributes (resource and record) - `probabilistic`: Sample a percentage of traces. Read [a comparison with the Probabilistic Sampling Processor](#probabilistic-sampling-processor-compared-to-the-tail-sampling-processor-with-the-probabilistic-policy). - `status_code`: Sample based upon the status code (`OK`, `ERROR` or `UNSET`) @@ -77,7 +77,7 @@ processors: { name: test-policy-2, type: latency, - latency: {threshold_ms: 5000} + latency: {threshold_ms: 5000, upper_threshold_ms: 10000} }, { name: test-policy-3, diff --git a/processor/tailsamplingprocessor/and_helper_test.go b/processor/tailsamplingprocessor/and_helper_test.go index f1cbe8a1e587..e46237ed95ed 100644 --- a/processor/tailsamplingprocessor/and_helper_test.go +++ b/processor/tailsamplingprocessor/and_helper_test.go @@ -30,7 +30,7 @@ func TestAndHelper(t *testing.T) { require.NoError(t, err) expected := sampling.NewAnd(zap.NewNop(), []sampling.PolicyEvaluator{ - sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100), + sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0), }) assert.Equal(t, expected, actual) }) diff --git a/processor/tailsamplingprocessor/composite_helper_test.go b/processor/tailsamplingprocessor/composite_helper_test.go index bc7d3cc2053e..578f3be289b9 100644 --- a/processor/tailsamplingprocessor/composite_helper_test.go +++ b/processor/tailsamplingprocessor/composite_helper_test.go @@ -50,11 +50,11 @@ func TestCompositeHelper(t *testing.T) { expected := sampling.NewComposite(zap.NewNop(), 1000, []sampling.SubPolicyEvalParams{ { - Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100), + Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0), MaxSpansPerSecond: 250, }, { - Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 200), + Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 200, 0), MaxSpansPerSecond: 500, }, }, sampling.MonotonicClock{}) diff --git a/processor/tailsamplingprocessor/config.go b/processor/tailsamplingprocessor/config.go index 91ae169fd015..008c72c72073 100644 --- a/processor/tailsamplingprocessor/config.go +++ b/processor/tailsamplingprocessor/config.go @@ -87,6 +87,7 @@ type AndSubPolicyCfg struct { sharedPolicyCfg `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct } +// TraceStateCfg holds the common configuration for trace states. type TraceStateCfg struct { // Tag that the filter is going to be matching against. Key string `mapstructure:"key"` @@ -94,6 +95,7 @@ type TraceStateCfg struct { Values []string `mapstructure:"values"` } +// AndCfg holds the common configuration to all and policies. type AndCfg struct { SubPolicyCfg []AndSubPolicyCfg `mapstructure:"and_sub_policy"` } @@ -126,8 +128,10 @@ type PolicyCfg struct { // LatencyCfg holds the configurable settings to create a latency filter sampling policy // evaluator type LatencyCfg struct { - // ThresholdMs in milliseconds. + // Lower bound in milliseconds. Retaining original name for compatibility ThresholdMs int64 `mapstructure:"threshold_ms"` + // Upper bound in milliseconds. + UpperThresholdmsMs int64 `mapstructure:"upper_threshold_ms"` } // NumericAttributeCfg holds the configurable settings to create a numeric attribute filter diff --git a/processor/tailsamplingprocessor/internal/sampling/latency.go b/processor/tailsamplingprocessor/internal/sampling/latency.go index 760b4e9f6768..be87f47165c9 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency.go @@ -13,17 +13,19 @@ import ( ) type latency struct { - logger *zap.Logger - thresholdMs int64 + logger *zap.Logger + thresholdMs int64 + upperThresholdMs int64 } var _ PolicyEvaluator = (*latency)(nil) // NewLatency creates a policy evaluator sampling traces with a duration higher than a configured threshold -func NewLatency(settings component.TelemetrySettings, thresholdMs int64) PolicyEvaluator { +func NewLatency(settings component.TelemetrySettings, thresholdMs int64, upperThresholdMs int64) PolicyEvaluator { return &latency{ - logger: settings.Logger, - thresholdMs: thresholdMs, + logger: settings.Logger, + thresholdMs: thresholdMs, + upperThresholdMs: upperThresholdMs, } } @@ -47,6 +49,9 @@ func (l *latency) Evaluate(_ context.Context, _ pcommon.TraceID, traceData *Trac } duration := maxTime.AsTime().Sub(minTime.AsTime()) - return duration.Milliseconds() >= l.thresholdMs + if l.upperThresholdMs == 0 { + return duration.Milliseconds() >= l.thresholdMs + } + return (l.thresholdMs < duration.Milliseconds() && duration.Milliseconds() <= l.upperThresholdMs) }), nil } diff --git a/processor/tailsamplingprocessor/internal/sampling/latency_test.go b/processor/tailsamplingprocessor/internal/sampling/latency_test.go index 839feddcbf9d..b5a2537edf65 100644 --- a/processor/tailsamplingprocessor/internal/sampling/latency_test.go +++ b/processor/tailsamplingprocessor/internal/sampling/latency_test.go @@ -15,7 +15,7 @@ import ( ) func TestEvaluate_Latency(t *testing.T) { - filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000) + filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000, 0) traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) now := time.Now() @@ -71,6 +71,93 @@ func TestEvaluate_Latency(t *testing.T) { } } +func TestEvaluate_Bounded_Latency(t *testing.T) { + filter := NewLatency(componenttest.NewNopTelemetrySettings(), 5000, 10000) + + traceID := pcommon.TraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) + now := time.Now() + + cases := []struct { + Desc string + Spans []spanWithTimeAndDuration + Decision Decision + }{ + { + "trace duration shorter than lower bound", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 4500 * time.Millisecond, + }, + }, + NotSampled, + }, + { + "trace duration is equal to lower bound", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 5000 * time.Millisecond, + }, + }, + NotSampled, + }, + { + "trace duration is within lower and upper bounds", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 5001 * time.Millisecond, + }, + }, + Sampled, + }, + { + "trace duration is above upper bound", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 10001 * time.Millisecond, + }, + }, + NotSampled, + }, + { + "trace duration equals upper bound", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 10000 * time.Millisecond, + }, + }, + Sampled, + }, + { + "total trace duration is longer than threshold but every single span is shorter", + []spanWithTimeAndDuration{ + { + StartTime: now, + Duration: 3000 * time.Millisecond, + }, + { + StartTime: now.Add(2500 * time.Millisecond), + Duration: 3000 * time.Millisecond, + }, + }, + Sampled, + }, + } + + for _, c := range cases { + t.Run(c.Desc, func(t *testing.T) { + decision, err := filter.Evaluate(context.Background(), traceID, newTraceWithSpans(c.Spans)) + + assert.NoError(t, err) + assert.Equal(t, decision, c.Decision) + }) + } +} + type spanWithTimeAndDuration struct { StartTime time.Time Duration time.Duration diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index ae1c566886b6..75d5b55559c3 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -155,7 +155,7 @@ func getSharedPolicyEvaluator(settings component.TelemetrySettings, cfg *sharedP return sampling.NewAlwaysSample(settings), nil case Latency: lfCfg := cfg.LatencyCfg - return sampling.NewLatency(settings, lfCfg.ThresholdMs), nil + return sampling.NewLatency(settings, lfCfg.ThresholdMs, lfCfg.UpperThresholdmsMs), nil case NumericAttribute: nafCfg := cfg.NumericAttributeCfg return sampling.NewNumericAttributeFilter(settings, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue, nafCfg.InvertMatch), nil