From 79b3528a7dc99b3412e3d8ab71f79f76a3f0ec8c Mon Sep 17 00:00:00 2001 From: Matej Gera Date: Sat, 21 Dec 2019 14:59:34 +0100 Subject: [PATCH 1/3] Introduce simplified export pipeline setup for stdout --- exporter/metric/stdout/example_test.go | 43 ++++++++++++++++++++++++++ exporter/metric/stdout/stdout.go | 17 ++++++++++ 2 files changed, 60 insertions(+) create mode 100644 exporter/metric/stdout/example_test.go diff --git a/exporter/metric/stdout/example_test.go b/exporter/metric/stdout/example_test.go new file mode 100644 index 00000000000..ecaaee73bc9 --- /dev/null +++ b/exporter/metric/stdout/example_test.go @@ -0,0 +1,43 @@ +package stdout_test + +import ( + "context" + "log" + + "go.opentelemetry.io/otel/api/key" + "go.opentelemetry.io/otel/api/metric" + "go.opentelemetry.io/otel/exporter/metric/stdout" +) + +func ExampleNewExportPipeline() { + // Create a meter + pusher, err := stdout.NewExportPipeline(stdout.Options{ + PrettyPrint: true, + DoNotPrintTime: true, + }) + if err != nil { + log.Fatal("Could not initialize stdout exporter:", err) + } + defer pusher.Stop() + + ctx := context.Background() + + key := key.New("key") + meter := pusher.Meter("example") + + // Create and update a single counter: + counter := meter.NewInt64Counter("a.counter", metric.WithKeys(key)) + labels := meter.Labels(key.String("value")) + + counter.Add(ctx, 100, labels) + + // Output: + // { + // "updates": [ + // { + // "name": "a.counter{key=value}", + // "sum": 100 + // } + // ] + // } +} diff --git a/exporter/metric/stdout/stdout.go b/exporter/metric/stdout/stdout.go index 9a25b2c2608..7a34f854b4a 100644 --- a/exporter/metric/stdout/stdout.go +++ b/exporter/metric/stdout/stdout.go @@ -25,6 +25,9 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) type Exporter struct { @@ -98,6 +101,20 @@ func New(options Options) (*Exporter, error) { }, nil } +// NewExportPipeline sets up a complete export pipeline with the recommended setup +func NewExportPipeline(options Options) (*push.Controller, error) { + selector := simple.NewWithInexpensiveMeasure() + exporter, err := New(options) + if err != nil { + return nil, err + } + batcher := ungrouped.New(selector, true) + pusher := push.New(batcher, exporter, time.Second) + pusher.Start() + + return pusher, nil +} + func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { // N.B. Only return one aggError, if any occur. They're likely // to be duplicates of the same error. From 42def0bf3c52e5aedf6ddd807d3ada2971accc1e Mon Sep 17 00:00:00 2001 From: Liz Fong-Jones Date: Tue, 31 Dec 2019 15:13:31 -0500 Subject: [PATCH 2/3] Standardize dogstatsd,stdout,prometheus calling. * Creates NewRawExporter, NewExportPipeline, InstallNewPipeline methods. * Uses Options rather than Config throughout for options. --- example/basic/main.go | 12 +--- example/prometheus/main.go | 17 ++---- exporter/metric/dogstatsd/dogstatsd.go | 59 ++++++++++++++++--- exporter/metric/dogstatsd/dogstatsd_test.go | 2 +- exporter/metric/dogstatsd/example_test.go | 16 +---- exporter/metric/internal/statsd/conn.go | 22 +++---- exporter/metric/internal/statsd/conn_test.go | 8 +-- exporter/metric/prometheus/prometheus.go | 44 +++++++++++++- exporter/metric/prometheus/prometheus_test.go | 8 +-- exporter/metric/stdout/stdout.go | 34 +++++++++-- exporter/metric/stdout/stdout_test.go | 6 +- sdk/metric/example_test.go | 11 +--- 12 files changed, 152 insertions(+), 87 deletions(-) diff --git a/example/basic/main.go b/example/basic/main.go index 7950e8026fe..d84f5e051cf 100644 --- a/example/basic/main.go +++ b/example/basic/main.go @@ -17,7 +17,6 @@ package main import ( "context" "log" - "time" "go.opentelemetry.io/otel/api/distributedcontext" "go.opentelemetry.io/otel/api/global" @@ -26,10 +25,7 @@ import ( "go.opentelemetry.io/otel/api/trace" metricstdout "go.opentelemetry.io/otel/exporter/metric/stdout" tracestdout "go.opentelemetry.io/otel/exporter/trace/stdout" - metricsdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" "go.opentelemetry.io/otel/sdk/metric/controller/push" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) @@ -57,19 +53,13 @@ func initTracer() { } func initMeter() *push.Controller { - selector := simple.NewWithExactMeasure() - exporter, err := metricstdout.New(metricstdout.Options{ + pusher, err := metricstdout.InstallNewPipeline(metricstdout.Options{ Quantiles: []float64{0.5, 0.9, 0.99}, PrettyPrint: false, }) if err != nil { log.Panicf("failed to initialize metric stdout exporter %v", err) } - batcher := defaultkeys.New(selector, metricsdk.NewDefaultLabelEncoder(), true) - pusher := push.New(batcher, exporter, time.Second) - pusher.Start() - - global.SetMeterProvider(pusher) return pusher } diff --git a/example/prometheus/main.go b/example/prometheus/main.go index 07a57385be8..fc47822a33f 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -24,10 +24,7 @@ import ( "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporter/metric/prometheus" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" "go.opentelemetry.io/otel/sdk/metric/controller/push" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) var ( @@ -37,23 +34,17 @@ var ( ) func initMeter() *push.Controller { - selector := simple.NewWithExactMeasure() - exporter, err := prometheus.NewExporter(prometheus.Options{ + pusher, hf, err := prometheus.InstallNewPipeline(prometheus.Options{ DefaultHistogramBuckets: []float64{0., 10., 15., 20.}, }) - if err != nil { - log.Panicf("failed to initialize metric stdout exporter %v", err) + log.Panicf("failed to initialize prometheus exporter %v", err) } - batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), false) - pusher := push.New(batcher, exporter, time.Second) - pusher.Start() - + http.HandleFunc("/", hf) go func() { - _ = http.ListenAndServe(":2222", exporter) + _ = http.ListenAndServe(":2222", nil) }() - global.SetMeterProvider(pusher) return pusher } diff --git a/exporter/metric/dogstatsd/dogstatsd.go b/exporter/metric/dogstatsd/dogstatsd.go index 01c71a65964..b61c69303ce 100644 --- a/exporter/metric/dogstatsd/dogstatsd.go +++ b/exporter/metric/dogstatsd/dogstatsd.go @@ -16,13 +16,19 @@ package dogstatsd // import "go.opentelemetry.io/otel/exporter/metric/dogstatsd" import ( "bytes" + "time" + "go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/exporter/metric/internal/statsd" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) type ( - Config = statsd.Config + Options = statsd.Options // Exporter implements a dogstatsd-format statsd exporter, // which encodes label sets as independent fields in the @@ -45,20 +51,59 @@ var ( _ export.LabelEncoder = &Exporter{} ) -// New returns a new Dogstatsd-syntax exporter. This type implements -// the metric.LabelEncoder interface, allowing the SDK's unique label -// encoding to be pre-computed for the exporter and stored in the -// LabelSet. -func New(config Config) (*Exporter, error) { +// NewRawExporter returns a new Dogstatsd-syntax exporter for use in a pipeline. +// This type implements the metric.LabelEncoder interface, +// allowing the SDK's unique label encoding to be pre-computed +// for the exporter and stored in the LabelSet. +func NewRawExporter(options Options) (*Exporter, error) { exp := &Exporter{ LabelEncoder: statsd.NewLabelEncoder(), } var err error - exp.Exporter, err = statsd.NewExporter(config, exp) + exp.Exporter, err = statsd.NewExporter(options, exp) return exp, err } +// InstallNewPipeline instantiates a NewExportPipeline and registers it globally. +// Typically called as: +// pipeline, err := dogstatsd.InstallNewPipeline(dogstatsd.Options{...}) +// if err != nil { +// ... +// } +// defer pipeline.Stop() +// ... Done +func InstallNewPipeline(options Options) (*push.Controller, error) { + controller, err := NewExportPipeline(options) + if err != nil { + return controller, err + } + global.SetMeterProvider(controller) + return controller, err +} + +// NewExportPipeline sets up a complete export pipeline with the recommended setup, +// chaining a NewRawExporter into the recommended selectors and batchers. +func NewExportPipeline(options Options) (*push.Controller, error) { + selector := simple.NewWithExactMeasure() + exporter, err := NewRawExporter(options) + if err != nil { + return nil, err + } + + // The ungrouped batcher ensures that the export sees the full + // set of labels as dogstatsd tags. + batcher := ungrouped.New(selector, false) + + // The pusher automatically recognizes that the exporter + // implements the LabelEncoder interface, which ensures the + // export encoding for labels is encoded in the LabelSet. + pusher := push.New(batcher, exporter, time.Hour) + pusher.Start() + + return pusher, nil +} + // AppendName is part of the stats-internal adapter interface. func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) { _, _ = buf.WriteString(rec.Descriptor().Name()) diff --git a/exporter/metric/dogstatsd/dogstatsd_test.go b/exporter/metric/dogstatsd/dogstatsd_test.go index c4fc8609dc8..b090b3dff52 100644 --- a/exporter/metric/dogstatsd/dogstatsd_test.go +++ b/exporter/metric/dogstatsd/dogstatsd_test.go @@ -52,7 +52,7 @@ func TestDogstatsLabels(t *testing.T) { checkpointSet.Add(desc, cagg, key.New("A").String("B")) var buf bytes.Buffer - exp, err := dogstatsd.New(dogstatsd.Config{ + exp, err := dogstatsd.NewRawExporter(dogstatsd.Options{ Writer: &buf, }) require.Nil(t, err) diff --git a/exporter/metric/dogstatsd/example_test.go b/exporter/metric/dogstatsd/example_test.go index d5a03175779..47ceceaf418 100644 --- a/exporter/metric/dogstatsd/example_test.go +++ b/exporter/metric/dogstatsd/example_test.go @@ -6,14 +6,10 @@ import ( "io" "log" "sync" - "time" "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporter/metric/dogstatsd" - "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped" - "go.opentelemetry.io/otel/sdk/metric/controller/push" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) func ExampleNew() { @@ -42,8 +38,7 @@ func ExampleNew() { }() // Create a meter - selector := simple.NewWithExactMeasure() - exporter, err := dogstatsd.New(dogstatsd.Config{ + pusher, err := dogstatsd.NewExportPipeline(dogstatsd.Options{ // The Writer field provides test support. Writer: writer, @@ -54,15 +49,6 @@ func ExampleNew() { if err != nil { log.Fatal("Could not initialize dogstatsd exporter:", err) } - // The ungrouped batcher ensures that the export sees the full - // set of labels as dogstatsd tags. - batcher := ungrouped.New(selector, false) - - // The pusher automatically recognizes that the exporter - // implements the LabelEncoder interface, which ensures the - // export encoding for labels is encoded in the LabelSet. - pusher := push.New(batcher, exporter, time.Hour) - pusher.Start() ctx := context.Background() diff --git a/exporter/metric/internal/statsd/conn.go b/exporter/metric/internal/statsd/conn.go index 186d8c8e3c0..9998c93f97b 100644 --- a/exporter/metric/internal/statsd/conn.go +++ b/exporter/metric/internal/statsd/conn.go @@ -34,8 +34,8 @@ import ( ) type ( - // Config supports common options that apply to statsd exporters. - Config struct { + // Options supports common options that apply to statsd exporters. + Options struct { // URL describes the destination for exporting statsd data. // e.g., udp://host:port // tcp://host:port @@ -57,7 +57,7 @@ type ( // exporters. Exporter struct { adapter Adapter - config Config + options Options conn net.Conn writer io.Writer buffer bytes.Buffer @@ -88,17 +88,17 @@ var ( // NewExport returns a common implementation for exporters that Export // statsd syntax. -func NewExporter(config Config, adapter Adapter) (*Exporter, error) { - if config.MaxPacketSize <= 0 { - config.MaxPacketSize = MaxPacketSize +func NewExporter(options Options, adapter Adapter) (*Exporter, error) { + if options.MaxPacketSize <= 0 { + options.MaxPacketSize = MaxPacketSize } var writer io.Writer var conn net.Conn var err error - if config.Writer != nil { - writer = config.Writer + if options.Writer != nil { + writer = options.Writer } else { - conn, err = dial(config.URL) + conn, err = dial(options.URL) if conn != nil { writer = conn } @@ -108,7 +108,7 @@ func NewExporter(config Config, adapter Adapter) (*Exporter, error) { // Start() and Stop() API. return &Exporter{ adapter: adapter, - config: config, + options: options, conn: conn, writer: writer, }, err @@ -171,7 +171,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) return } - if buf.Len() < e.config.MaxPacketSize { + if buf.Len() < e.options.MaxPacketSize { return } if before == 0 { diff --git a/exporter/metric/internal/statsd/conn_test.go b/exporter/metric/internal/statsd/conn_test.go index eb0cd525a64..537e3b14df0 100644 --- a/exporter/metric/internal/statsd/conn_test.go +++ b/exporter/metric/internal/statsd/conn_test.go @@ -113,11 +113,11 @@ timer.B.D:%s|ms t.Run(nkind.String(), func(t *testing.T) { ctx := context.Background() writer := &testWriter{} - config := statsd.Config{ + options := statsd.Options{ Writer: writer, MaxPacketSize: 1024, } - exp, err := statsd.NewExporter(config, adapter) + exp, err := statsd.NewExporter(options, adapter) if err != nil { t.Fatal("New error: ", err) } @@ -274,12 +274,12 @@ func TestPacketSplit(t *testing.T) { t.Run(tcase.name, func(t *testing.T) { ctx := context.Background() writer := &testWriter{} - config := statsd.Config{ + options := statsd.Options{ Writer: writer, MaxPacketSize: 1024, } adapter := newWithTagsAdapter() - exp, err := statsd.NewExporter(config, adapter) + exp, err := statsd.NewExporter(options, adapter) if err != nil { t.Fatal("New error: ", err) } diff --git a/exporter/metric/prometheus/prometheus.go b/exporter/metric/prometheus/prometheus.go index 5bc593f66da..4e744c1a3c5 100644 --- a/exporter/metric/prometheus/prometheus.go +++ b/exporter/metric/prometheus/prometheus.go @@ -17,13 +17,19 @@ package prometheus import ( "context" "net/http" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/api/global" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) type metricKey struct { @@ -90,8 +96,9 @@ const ( Summary ) -// NewExporter returns a new prometheus exporter for prometheus metrics. -func NewExporter(opts Options) (*Exporter, error) { +// NewRawExporter returns a new prometheus exporter for prometheus metrics +// for use in a pipeline. +func NewRawExporter(opts Options) (*Exporter, error) { if opts.Registry == nil { opts.Registry = prometheus.NewRegistry() } @@ -117,6 +124,39 @@ func NewExporter(opts Options) (*Exporter, error) { }, nil } +// InstallNewPipeline instantiates a NewExportPipeline and registers it globally. +// Typically called as: +// pipeline, hf, err := prometheus.InstallNewPipeline(prometheus.Options{...}) +// if err != nil { +// ... +// } +// http.HandleFunc("/metrics", hf) +// defer pipeline.Stop() +// ... Done +func InstallNewPipeline(options Options) (*push.Controller, http.HandlerFunc, error) { + controller, hf, err := NewExportPipeline(options) + if err != nil { + return controller, hf, err + } + global.SetMeterProvider(controller) + return controller, hf, err +} + +// NewExportPipeline sets up a complete export pipeline with the recommended setup, +// chaining a NewRawExporter into the recommended selectors and batchers. +func NewExportPipeline(options Options) (*push.Controller, http.HandlerFunc, error) { + selector := simple.NewWithExactMeasure() + exporter, err := NewRawExporter(options) + if err != nil { + return nil, nil, err + } + batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), false) + pusher := push.New(batcher, exporter, time.Second) + pusher.Start() + + return pusher, exporter.ServeHTTP, nil +} + // Export exports the provide metric record to prometheus. func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { var forEachError error diff --git a/exporter/metric/prometheus/prometheus_test.go b/exporter/metric/prometheus/prometheus_test.go index 1cbd252d1f0..fbfe0043490 100644 --- a/exporter/metric/prometheus/prometheus_test.go +++ b/exporter/metric/prometheus/prometheus_test.go @@ -19,11 +19,11 @@ import ( ) func TestPrometheusExporter(t *testing.T) { - exporter, err := prometheus.NewExporter(prometheus.Options{ + exporter, err := prometheus.NewRawExporter(prometheus.Options{ DefaultHistogramBuckets: []float64{0., 10., 15., 20.}, }) if err != nil { - log.Panicf("failed to initialize metric stdout exporter %v", err) + log.Panicf("failed to initialize prometheus exporter %v", err) } var expected []string @@ -82,11 +82,11 @@ func TestPrometheusExporter(t *testing.T) { } func TestPrometheusExporter_Summaries(t *testing.T) { - exporter, err := prometheus.NewExporter(prometheus.Options{ + exporter, err := prometheus.NewRawExporter(prometheus.Options{ MeasureAggregation: prometheus.Summary, }) if err != nil { - log.Panicf("failed to initialize metric stdout exporter %v", err) + log.Panicf("failed to initialize prometheus exporter %v", err) } var expected []string diff --git a/exporter/metric/stdout/stdout.go b/exporter/metric/stdout/stdout.go index 7a34f854b4a..c53500adef7 100644 --- a/exporter/metric/stdout/stdout.go +++ b/exporter/metric/stdout/stdout.go @@ -23,9 +23,12 @@ import ( "strings" "time" + "go.opentelemetry.io/otel/api/global" + export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" - "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped" + metricsdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" "go.opentelemetry.io/otel/sdk/metric/controller/push" "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) @@ -83,7 +86,8 @@ type expoQuantile struct { V interface{} `json:"v"` } -func New(options Options) (*Exporter, error) { +// NewRawExporter creates a stdout Exporter for use in a pipeline. +func NewRawExporter(options Options) (*Exporter, error) { if options.File == nil { options.File = os.Stdout } @@ -101,14 +105,32 @@ func New(options Options) (*Exporter, error) { }, nil } -// NewExportPipeline sets up a complete export pipeline with the recommended setup +// InstallNewPipeline instantiates a NewExportPipeline and registers it globally. +// Typically called as: +// pipeline, err := stdout.InstallNewPipeline(stdout.Options{...}) +// if err != nil { +// ... +// } +// defer pipeline.Stop() +// ... Done +func InstallNewPipeline(options Options) (*push.Controller, error) { + controller, err := NewExportPipeline(options) + if err != nil { + return controller, err + } + global.SetMeterProvider(controller) + return controller, err +} + +// NewExportPipeline sets up a complete export pipeline with the recommended setup, +// chaining a NewRawExporter into the recommended selectors and batchers. func NewExportPipeline(options Options) (*push.Controller, error) { - selector := simple.NewWithInexpensiveMeasure() - exporter, err := New(options) + selector := simple.NewWithExactMeasure() + exporter, err := NewRawExporter(options) if err != nil { return nil, err } - batcher := ungrouped.New(selector, true) + batcher := defaultkeys.New(selector, metricsdk.NewDefaultLabelEncoder(), true) pusher := push.New(batcher, exporter, time.Second) pusher.Start() diff --git a/exporter/metric/stdout/stdout_test.go b/exporter/metric/stdout/stdout_test.go index ed680709bfa..e60ae022ecc 100644 --- a/exporter/metric/stdout/stdout_test.go +++ b/exporter/metric/stdout/stdout_test.go @@ -36,7 +36,7 @@ func newFixture(t *testing.T, options stdout.Options) testFixture { buf := &bytes.Buffer{} options.File = buf options.DoNotPrintTime = true - exp, err := stdout.New(options) + exp, err := stdout.NewRawExporter(options) if err != nil { t.Fatal("Error building fixture: ", err) } @@ -60,7 +60,7 @@ func (fix testFixture) Export(checkpointSet export.CheckpointSet) { } func TestStdoutInvalidQuantile(t *testing.T) { - _, err := stdout.New(stdout.Options{ + _, err := stdout.NewRawExporter(stdout.Options{ Quantiles: []float64{1.1, 0.9}, }) require.Error(t, err, "Invalid quantile error expected") @@ -69,7 +69,7 @@ func TestStdoutInvalidQuantile(t *testing.T) { func TestStdoutTimestamp(t *testing.T) { var buf bytes.Buffer - exporter, err := stdout.New(stdout.Options{ + exporter, err := stdout.NewRawExporter(stdout.Options{ File: &buf, DoNotPrintTime: false, }) diff --git a/sdk/metric/example_test.go b/sdk/metric/example_test.go index 84fb715bd3e..112204dc232 100644 --- a/sdk/metric/example_test.go +++ b/sdk/metric/example_test.go @@ -17,29 +17,20 @@ package metric_test import ( "context" "fmt" - "time" "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/exporter/metric/stdout" - sdk "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys" - "go.opentelemetry.io/otel/sdk/metric/controller/push" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) func ExampleNew() { - selector := simple.NewWithInexpensiveMeasure() - exporter, err := stdout.New(stdout.Options{ + pusher, err := stdout.NewExportPipeline(stdout.Options{ PrettyPrint: true, DoNotPrintTime: true, // This makes the output deterministic }) if err != nil { panic(fmt.Sprintln("Could not initialize stdout exporter:", err)) } - batcher := defaultkeys.New(selector, sdk.NewDefaultLabelEncoder(), true) - pusher := push.New(batcher, exporter, time.Second) - pusher.Start() defer pusher.Stop() ctx := context.Background() From fa9482b7d54d43e1f3f46119d7c7f968126fbefb Mon Sep 17 00:00:00 2001 From: Liz Fong-Jones Date: Tue, 31 Dec 2019 15:43:21 -0500 Subject: [PATCH 3/3] fix merge conflicts. --- api/global/internal/meter_test.go | 29 +++--------- example/prometheus/main.go | 4 +- exporter/metric/prometheus/prometheus.go | 16 ++++--- exporter/metric/prometheus/prometheus_test.go | 47 +------------------ 4 files changed, 17 insertions(+), 79 deletions(-) diff --git a/api/global/internal/meter_test.go b/api/global/internal/meter_test.go index e48d5b47662..7b1512c7456 100644 --- a/api/global/internal/meter_test.go +++ b/api/global/internal/meter_test.go @@ -5,7 +5,6 @@ import ( "io" "io/ioutil" "testing" - "time" "github.com/stretchr/testify/require" @@ -15,9 +14,6 @@ import ( "go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/exporter/metric/stdout" metrictest "go.opentelemetry.io/otel/internal/metric" - "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped" - "go.opentelemetry.io/otel/sdk/metric/controller/push" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) func TestDirect(t *testing.T) { @@ -207,26 +203,13 @@ func TestDefaultSDK(t *testing.T) { counter.Add(ctx, 1, labels1) in, out := io.Pipe() - // TODO this should equal a stdout.NewPipeline(), use it. - // Consider also moving the io.Pipe() and go func() call - // below into a test helper somewhere. - sdk := func(options stdout.Options) *push.Controller { - selector := simple.NewWithInexpensiveMeasure() - exporter, err := stdout.New(options) - if err != nil { - panic(err) - } - batcher := ungrouped.New(selector, true) - pusher := push.New(batcher, exporter, time.Second) - pusher.Start() - - return pusher - }(stdout.Options{ + pusher, err := stdout.InstallNewPipeline(stdout.Options{ File: out, DoNotPrintTime: true, }) - - global.SetMeterProvider(sdk) + if err != nil { + panic(err) + } counter.Add(ctx, 1, labels1) @@ -236,9 +219,9 @@ func TestDefaultSDK(t *testing.T) { ch <- string(data) }() - sdk.Stop() + pusher.Stop() out.Close() - require.Equal(t, `{"updates":[{"name":"test.builtin{A=B}","sum":1}]} + require.Equal(t, `{"updates":[{"name":"test.builtin","sum":1}]} `, <-ch) } diff --git a/example/prometheus/main.go b/example/prometheus/main.go index fc47822a33f..e5f5becf1d7 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -34,9 +34,7 @@ var ( ) func initMeter() *push.Controller { - pusher, hf, err := prometheus.InstallNewPipeline(prometheus.Options{ - DefaultHistogramBuckets: []float64{0., 10., 15., 20.}, - }) + pusher, hf, err := prometheus.InstallNewPipeline(prometheus.Options{}) if err != nil { log.Panicf("failed to initialize prometheus exporter %v", err) } diff --git a/exporter/metric/prometheus/prometheus.go b/exporter/metric/prometheus/prometheus.go index 3b2606079f6..90cc7bc1fee 100644 --- a/exporter/metric/prometheus/prometheus.go +++ b/exporter/metric/prometheus/prometheus.go @@ -79,13 +79,6 @@ type Options struct { OnError func(error) } -type MeasureAggregation int - -const ( - Histogram MeasureAggregation = iota - Summary -) - // NewRawExporter returns a new prometheus exporter for prometheus metrics // for use in a pipeline. func NewRawExporter(opts Options) (*Exporter, error) { @@ -148,6 +141,15 @@ func NewExportPipeline(options Options) (*push.Controller, http.HandlerFunc, err if err != nil { return nil, nil, err } + + // Prometheus needs to use a stateful batcher since counters (and histogram since they are a collection of Counters) + // are cumulative (i.e., monotonically increasing values) and should not be resetted after each export. + // + // Prometheus uses this approach to be resilient to scrape failures. + // If a Prometheus server tries to scrape metrics from a host and fails for some reason, + // it could try again on the next scrape and no data would be lost, only resolution. + // + // Gauges (or LastValues) and Summaries are an exception to this and have different behaviors. batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), false) pusher := push.New(batcher, exporter, time.Second) pusher.Start() diff --git a/exporter/metric/prometheus/prometheus_test.go b/exporter/metric/prometheus/prometheus_test.go index 6bc11ce0656..602731ee4f5 100644 --- a/exporter/metric/prometheus/prometheus_test.go +++ b/exporter/metric/prometheus/prometheus_test.go @@ -20,7 +20,7 @@ import ( func TestPrometheusExporter(t *testing.T) { exporter, err := prometheus.NewRawExporter(prometheus.Options{ - DefaultHistogramBuckets: []float64{0., 10., 15., 20.}, + DefaultSummaryQuantiles: []float64{0.5, 0.9, 0.99}, }) if err != nil { log.Panicf("failed to initialize prometheus exporter %v", err) @@ -67,51 +67,6 @@ func TestPrometheusExporter(t *testing.T) { checkpointSet.AddGauge(gauge, 32, missingLabels...) expected = append(expected, `gauge{A="E",C=""} 32`) - checkpointSet.AddMeasure(measure, 19, missingLabels...) - expected = append(expected, `measure_bucket{A="E",C="",le="+Inf"} 1`) - expected = append(expected, `measure_bucket{A="E",C="",le="0"} 0`) - expected = append(expected, `measure_bucket{A="E",C="",le="10"} 0`) - expected = append(expected, `measure_bucket{A="E",C="",le="15"} 0`) - expected = append(expected, `measure_bucket{A="E",C="",le="20"} 1`) - expected = append(expected, `measure_count{A="E",C=""} 1`) - expected = append(expected, `measure_sum{A="E",C=""} 19`) - - compareExport(t, exporter, checkpointSet, expected) -} - -func TestPrometheusExporter_Summaries(t *testing.T) { - exporter, err := prometheus.NewRawExporter(prometheus.Options{ - MeasureAggregation: prometheus.Summary, - }) - if err != nil { - log.Panicf("failed to initialize prometheus exporter %v", err) - } - - var expected []string - checkpointSet := test.NewCheckpointSet(metric.NewDefaultLabelEncoder()) - - measure := export.NewDescriptor( - "measure", export.MeasureKind, nil, "", "", core.Float64NumberKind, false) - - labels := []core.KeyValue{ - key.New("A").String("B"), - key.New("C").String("D"), - } - - checkpointSet.AddMeasure(measure, 13, labels...) - checkpointSet.AddMeasure(measure, 15, labels...) - checkpointSet.AddMeasure(measure, 17, labels...) - expected = append(expected, `measure_count{A="B",C="D"} 3`) - expected = append(expected, `measure_sum{A="B",C="D"} 45`) - expected = append(expected, `measure{A="B",C="D",quantile="0.5"} 15`) - expected = append(expected, `measure{A="B",C="D",quantile="0.9"} 17`) - expected = append(expected, `measure{A="B",C="D",quantile="0.99"} 17`) - - missingLabels := []core.KeyValue{ - key.New("A").String("E"), - key.New("C").String(""), - } - checkpointSet.AddMeasure(measure, 19, missingLabels...) expected = append(expected, `measure{A="E",C="",quantile="0.5"} 19`) expected = append(expected, `measure{A="E",C="",quantile="0.9"} 19`)