Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metrics] standardize/simplify export pipeline setup #395

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 6 additions & 23 deletions api/global/internal/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"io/ioutil"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -15,9 +14,6 @@ import (
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/exporter/metric/stdout"
metrictest "go.opentelemetry.io/otel/internal/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

func TestDirect(t *testing.T) {
Expand Down Expand Up @@ -207,26 +203,13 @@ func TestDefaultSDK(t *testing.T) {
counter.Add(ctx, 1, labels1)

in, out := io.Pipe()
// TODO this should equal a stdout.NewPipeline(), use it.
// Consider also moving the io.Pipe() and go func() call
// below into a test helper somewhere.
sdk := func(options stdout.Options) *push.Controller {
selector := simple.NewWithInexpensiveMeasure()
exporter, err := stdout.New(options)
if err != nil {
panic(err)
}
batcher := ungrouped.New(selector, true)
pusher := push.New(batcher, exporter, time.Second)
pusher.Start()

return pusher
}(stdout.Options{
pusher, err := stdout.InstallNewPipeline(stdout.Options{
Writer: out,
DoNotPrintTime: true,
})

global.SetMeterProvider(sdk)
if err != nil {
panic(err)
}

counter.Add(ctx, 1, labels1)

Expand All @@ -236,9 +219,9 @@ func TestDefaultSDK(t *testing.T) {
ch <- string(data)
}()

sdk.Stop()
pusher.Stop()
out.Close()

require.Equal(t, `{"updates":[{"name":"test.builtin{A=B}","sum":1}]}
require.Equal(t, `{"updates":[{"name":"test.builtin","sum":1}]}
`, <-ch)
}
12 changes: 1 addition & 11 deletions example/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main
import (
"context"
"log"
"time"

"go.opentelemetry.io/otel/api/distributedcontext"
"go.opentelemetry.io/otel/api/global"
Expand All @@ -26,10 +25,7 @@ import (
"go.opentelemetry.io/otel/api/trace"
metricstdout "go.opentelemetry.io/otel/exporter/metric/stdout"
tracestdout "go.opentelemetry.io/otel/exporter/trace/stdout"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

Expand Down Expand Up @@ -57,19 +53,13 @@ func initTracer() {
}

func initMeter() *push.Controller {
selector := simple.NewWithExactMeasure()
exporter, err := metricstdout.New(metricstdout.Options{
pusher, err := metricstdout.InstallNewPipeline(metricstdout.Options{
Quantiles: []float64{0.5, 0.9, 0.99},
PrettyPrint: false,
})
if err != nil {
log.Panicf("failed to initialize metric stdout exporter %v", err)
}
batcher := defaultkeys.New(selector, metricsdk.NewDefaultLabelEncoder(), true)
pusher := push.New(batcher, exporter, time.Second)
pusher.Start()

global.SetMeterProvider(pusher)
return pusher
}

Expand Down
25 changes: 4 additions & 21 deletions example/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ import (
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporter/metric/prometheus"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/defaultkeys"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

var (
Expand All @@ -37,29 +34,15 @@ var (
)

func initMeter() *push.Controller {
selector := simple.NewWithExactMeasure()
exporter, err := prometheus.NewExporter(prometheus.Options{})

pusher, hf, err := prometheus.InstallNewPipeline(prometheus.Options{})
if err != nil {
log.Panicf("failed to initialize metric stdout exporter %v", err)
log.Panicf("failed to initialize prometheus exporter %v", err)
}
// Prometheus needs to use a stateful batcher since counters (and histogram since they are a collection of Counters)
// are cumulative (i.e., monotonically increasing values) and should not be resetted after each export.
//
// Prometheus uses this approach to be resilient to scrape failures.
// If a Prometheus server tries to scrape metrics from a host and fails for some reason,
// it could try again on the next scrape and no data would be lost, only resolution.
//
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), true)
pusher := push.New(batcher, exporter, time.Second)
pusher.Start()

http.HandleFunc("/", hf)
go func() {
_ = http.ListenAndServe(":2222", exporter)
_ = http.ListenAndServe(":2222", nil)
}()

global.SetMeterProvider(pusher)
return pusher
}

Expand Down
59 changes: 52 additions & 7 deletions exporter/metric/dogstatsd/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ package dogstatsd // import "go.opentelemetry.io/otel/exporter/metric/dogstatsd"

import (
"bytes"
"time"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/exporter/metric/internal/statsd"

export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

type (
Config = statsd.Config
Options = statsd.Options

// Exporter implements a dogstatsd-format statsd exporter,
// which encodes label sets as independent fields in the
Expand All @@ -45,20 +51,59 @@ var (
_ export.LabelEncoder = &Exporter{}
)

// New returns a new Dogstatsd-syntax exporter. This type implements
// the metric.LabelEncoder interface, allowing the SDK's unique label
// encoding to be pre-computed for the exporter and stored in the
// LabelSet.
func New(config Config) (*Exporter, error) {
// NewRawExporter returns a new Dogstatsd-syntax exporter for use in a pipeline.
// This type implements the metric.LabelEncoder interface,
// allowing the SDK's unique label encoding to be pre-computed
// for the exporter and stored in the LabelSet.
func NewRawExporter(options Options) (*Exporter, error) {
exp := &Exporter{
LabelEncoder: statsd.NewLabelEncoder(),
}

var err error
exp.Exporter, err = statsd.NewExporter(config, exp)
exp.Exporter, err = statsd.NewExporter(options, exp)
return exp, err
}

// InstallNewPipeline instantiates a NewExportPipeline and registers it globally.
// Typically called as:
// pipeline, err := dogstatsd.InstallNewPipeline(dogstatsd.Options{...})
// if err != nil {
// ...
// }
// defer pipeline.Stop()
// ... Done
func InstallNewPipeline(options Options) (*push.Controller, error) {
controller, err := NewExportPipeline(options)
if err != nil {
return controller, err
}
global.SetMeterProvider(controller)
return controller, err
}

// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and batchers.
func NewExportPipeline(options Options) (*push.Controller, error) {
selector := simple.NewWithExactMeasure()
exporter, err := NewRawExporter(options)
if err != nil {
return nil, err
}

// The ungrouped batcher ensures that the export sees the full
// set of labels as dogstatsd tags.
batcher := ungrouped.New(selector, false)

// The pusher automatically recognizes that the exporter
// implements the LabelEncoder interface, which ensures the
// export encoding for labels is encoded in the LabelSet.
pusher := push.New(batcher, exporter, time.Hour)
pusher.Start()

return pusher, nil
}

// AppendName is part of the stats-internal adapter interface.
func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())
Expand Down
2 changes: 1 addition & 1 deletion exporter/metric/dogstatsd/dogstatsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestDogstatsLabels(t *testing.T) {
checkpointSet.Add(desc, cagg, key.New("A").String("B"))

var buf bytes.Buffer
exp, err := dogstatsd.New(dogstatsd.Config{
exp, err := dogstatsd.NewRawExporter(dogstatsd.Options{
Writer: &buf,
})
require.Nil(t, err)
Expand Down
16 changes: 1 addition & 15 deletions exporter/metric/dogstatsd/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,10 @@ import (
"io"
"log"
"sync"
"time"

"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporter/metric/dogstatsd"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

func ExampleNew() {
Expand Down Expand Up @@ -42,8 +38,7 @@ func ExampleNew() {
}()

// Create a meter
selector := simple.NewWithExactMeasure()
exporter, err := dogstatsd.New(dogstatsd.Config{
pusher, err := dogstatsd.NewExportPipeline(dogstatsd.Options{
// The Writer field provides test support.
Writer: writer,

Expand All @@ -54,15 +49,6 @@ func ExampleNew() {
if err != nil {
log.Fatal("Could not initialize dogstatsd exporter:", err)
}
// The ungrouped batcher ensures that the export sees the full
// set of labels as dogstatsd tags.
batcher := ungrouped.New(selector, false)

// The pusher automatically recognizes that the exporter
// implements the LabelEncoder interface, which ensures the
// export encoding for labels is encoded in the LabelSet.
pusher := push.New(batcher, exporter, time.Hour)
pusher.Start()

ctx := context.Background()

Expand Down
22 changes: 11 additions & 11 deletions exporter/metric/internal/statsd/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
)

type (
// Config supports common options that apply to statsd exporters.
Config struct {
// Options supports common options that apply to statsd exporters.
Options struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an unrelated remark.

I thought we had moved toward the practice of using "Config" as the structure name, and Option as a functional argument (func(*Config)), and Options as a []Option. See api/trace StartConfig and StartOption, for example.

That's why I prefer this struct be called Config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #369

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this was to standardize the patterns for all Exporter options/config. Prometheus and stdout were using options, statsd was using config...

// URL describes the destination for exporting statsd data.
// e.g., udp://host:port
// tcp://host:port
Expand All @@ -57,7 +57,7 @@ type (
// exporters.
Exporter struct {
adapter Adapter
config Config
options Options
conn net.Conn
writer io.Writer
buffer bytes.Buffer
Expand Down Expand Up @@ -88,17 +88,17 @@ var (

// NewExport returns a common implementation for exporters that Export
// statsd syntax.
func NewExporter(config Config, adapter Adapter) (*Exporter, error) {
if config.MaxPacketSize <= 0 {
config.MaxPacketSize = MaxPacketSize
func NewExporter(options Options, adapter Adapter) (*Exporter, error) {
if options.MaxPacketSize <= 0 {
options.MaxPacketSize = MaxPacketSize
}
var writer io.Writer
var conn net.Conn
var err error
if config.Writer != nil {
writer = config.Writer
if options.Writer != nil {
writer = options.Writer
} else {
conn, err = dial(config.URL)
conn, err = dial(options.URL)
if conn != nil {
writer = conn
}
Expand All @@ -108,7 +108,7 @@ func NewExporter(config Config, adapter Adapter) (*Exporter, error) {
// Start() and Stop() API.
return &Exporter{
adapter: adapter,
config: config,
options: options,
conn: conn,
writer: writer,
}, err
Expand Down Expand Up @@ -171,7 +171,7 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)
return
}

if buf.Len() < e.config.MaxPacketSize {
if buf.Len() < e.options.MaxPacketSize {
return
}
if before == 0 {
Expand Down
8 changes: 4 additions & 4 deletions exporter/metric/internal/statsd/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ timer.B.D:%s|ms
t.Run(nkind.String(), func(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
options := statsd.Options{
Writer: writer,
MaxPacketSize: 1024,
}
exp, err := statsd.NewExporter(config, adapter)
exp, err := statsd.NewExporter(options, adapter)
if err != nil {
t.Fatal("New error: ", err)
}
Expand Down Expand Up @@ -274,12 +274,12 @@ func TestPacketSplit(t *testing.T) {
t.Run(tcase.name, func(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
options := statsd.Options{
Writer: writer,
MaxPacketSize: 1024,
}
adapter := newWithTagsAdapter()
exp, err := statsd.NewExporter(config, adapter)
exp, err := statsd.NewExporter(options, adapter)
if err != nil {
t.Fatal("New error: ", err)
}
Expand Down
Loading