Skip to content

Commit

Permalink
Pass Resources through the metrics export pipeline (#659)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd authored Apr 24, 2020
1 parent cd1be0e commit 3008c1b
Show file tree
Hide file tree
Showing 18 changed files with 190 additions and 199 deletions.
22 changes: 0 additions & 22 deletions api/metric/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/unit"
"go.opentelemetry.io/otel/sdk/resource"
)

// Provider supports named Meter instances.
Expand All @@ -38,8 +37,6 @@ type Config struct {
Description string
// Unit is an optional field describing the metric instrument.
Unit unit.Unit
// Resource describes the entity for which measurements are made.
Resource *resource.Resource
// LibraryName is the name given to the Meter that created
// this instrument. See `Provider`.
LibraryName string
Expand Down Expand Up @@ -132,12 +129,6 @@ func (d Descriptor) NumberKind() core.NumberKind {
return d.numberKind
}

// Resource returns the Resource describing the entity for which the metric
// instrument measures.
func (d Descriptor) Resource() *resource.Resource {
return d.config.Resource
}

// LibraryName returns the metric instrument's library name, typically
// given via a call to Provider.Meter().
func (d Descriptor) LibraryName() string {
Expand Down Expand Up @@ -200,19 +191,6 @@ func (u unitOption) Apply(config *Config) {
config.Unit = unit.Unit(u)
}

// WithResource applies provided Resource.
//
// This will override any existing Resource.
func WithResource(r *resource.Resource) Option {
return resourceOption{r}
}

type resourceOption struct{ *resource.Resource }

func (r resourceOption) Apply(config *Config) {
config.Resource = r.Resource
}

// WithLibraryName applies provided library name. This is meant for
// use in `Provider` implementations that have not used
// `WrapMeterImpl`. Implementations built using `WrapMeterImpl` have
Expand Down
49 changes: 16 additions & 33 deletions api/metric/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/unit"
mockTest "go.opentelemetry.io/otel/internal/metric"
"go.opentelemetry.io/otel/sdk/resource"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
Expand All @@ -36,74 +35,58 @@ var Must = metric.Must

func TestOptions(t *testing.T) {
type testcase struct {
name string
opts []metric.Option
desc string
unit unit.Unit
resource *resource.Resource
name string
opts []metric.Option
desc string
unit unit.Unit
}
testcases := []testcase{
{
name: "no opts",
opts: nil,
desc: "",
unit: "",
resource: nil,
name: "no opts",
opts: nil,
desc: "",
unit: "",
},
{
name: "description",
opts: []metric.Option{
metric.WithDescription("stuff"),
},
desc: "stuff",
unit: "",
resource: nil,
desc: "stuff",
unit: "",
},
{
name: "description override",
opts: []metric.Option{
metric.WithDescription("stuff"),
metric.WithDescription("things"),
},
desc: "things",
unit: "",
resource: nil,
desc: "things",
unit: "",
},
{
name: "unit",
opts: []metric.Option{
metric.WithUnit("s"),
},
desc: "",
unit: "s",
resource: nil,
desc: "",
unit: "s",
},
{
name: "unit override",
opts: []metric.Option{
metric.WithUnit("s"),
metric.WithUnit("h"),
},
desc: "",
unit: "h",
resource: nil,
},
{
name: "resource override",
opts: []metric.Option{
metric.WithResource(resource.New(key.New("name").String("test-name"))),
},
desc: "",
unit: "",
resource: resource.New(key.New("name").String("test-name")),
desc: "",
unit: "h",
},
}
for idx, tt := range testcases {
t.Logf("Testing counter case %s (%d)", tt.name, idx)
if diff := cmp.Diff(metric.Configure(tt.opts), metric.Config{
Description: tt.desc,
Unit: tt.unit,
Resource: tt.resource,
}); diff != "" {
t.Errorf("Compare options: -got +want %s", diff)
}
Expand Down
26 changes: 0 additions & 26 deletions api/metric/sdkhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/sdk/resource"
)

// MeterImpl is a convenient interface for SDK and test
Expand Down Expand Up @@ -122,29 +121,6 @@ func Configure(opts []Option) Config {
return config
}

// Resourcer is implemented by any value that has a Resource method,
// which returns the Resource associated with the value.
// The Resource method is used to set the Resource for Descriptors of new
// metric instruments.
type Resourcer interface {
Resource() *resource.Resource
}

// insertResource inserts a WithResource option at the beginning of opts
// using the resource defined by impl if impl implements Resourcer.
//
// If opts contains a WithResource option already, that Option will take
// precedence and overwrite the Resource set from impl.
//
// The returned []Option may uses the same underlying array as opts.
func insertResource(impl MeterImpl, opts []Option) []Option {
if r, ok := impl.(Resourcer); ok {
// default to the impl resource and override if passed in opts.
return append([]Option{WithResource(r.Resource())}, opts...)
}
return opts
}

// WrapMeterImpl constructs a `Meter` implementation from a
// `MeterImpl` implementation.
func WrapMeterImpl(impl MeterImpl, libraryName string) Meter {
Expand All @@ -159,7 +135,6 @@ func (m *wrappedMeterImpl) RecordBatch(ctx context.Context, ls []core.KeyValue,
}

func (m *wrappedMeterImpl) newSync(name string, metricKind Kind, numberKind core.NumberKind, opts []Option) (SyncImpl, error) {
opts = insertResource(m.impl, opts)
desc := NewDescriptor(name, metricKind, numberKind, opts...)
desc.config.LibraryName = m.libraryName
return m.impl.NewSyncInstrument(desc)
Expand Down Expand Up @@ -222,7 +197,6 @@ func WrapFloat64MeasureInstrument(syncInst SyncImpl, err error) (Float64Measure,
}

func (m *wrappedMeterImpl) newAsync(name string, mkind Kind, nkind core.NumberKind, opts []Option, callback func(func(core.Number, []core.KeyValue))) (AsyncImpl, error) {
opts = insertResource(m.impl, opts)
desc := NewDescriptor(name, mkind, nkind, opts...)
desc.config.LibraryName = m.libraryName
return m.impl.NewAsyncInstrument(desc, callback)
Expand Down
4 changes: 3 additions & 1 deletion exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
"go.opentelemetry.io/otel/sdk/resource"
)

// Exporter is an implementation of metric.Exporter that sends metrics to
Expand Down Expand Up @@ -167,7 +168,8 @@ func NewExportPipeline(config Config, period time.Duration) (*push.Controller, h
}

// Export exports the provide metric record to prometheus.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
func (e *Exporter) Export(_ context.Context, _ *resource.Resource, checkpointSet export.CheckpointSet) error {
// TODO: Use the resource value in this exporter.
e.snapshot = checkpointSet
return nil
}
Expand Down
5 changes: 2 additions & 3 deletions exporters/metric/prometheus/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/exporters/metric/test"
Expand All @@ -41,7 +40,7 @@ func TestPrometheusExporter(t *testing.T) {
}

var expected []string
checkpointSet := test.NewCheckpointSet(label.DefaultEncoder())
checkpointSet := test.NewCheckpointSet()

counter := metric.NewDescriptor(
"counter", metric.CounterKind, core.Float64NumberKind)
Expand Down Expand Up @@ -119,7 +118,7 @@ func TestPrometheusExporter(t *testing.T) {
}

func compareExport(t *testing.T, exporter *prometheus.Exporter, checkpointSet *test.CheckpointSet, expected []string) {
err := exporter.Export(context.Background(), checkpointSet)
err := exporter.Export(context.Background(), nil, checkpointSet)
require.Nil(t, err)

rec := httptest.NewRecorder()
Expand Down
18 changes: 12 additions & 6 deletions exporters/metric/stdout/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/sdk/resource"

export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
Expand Down Expand Up @@ -120,8 +121,8 @@ func NewRawExporter(config Config) (*Exporter, error) {
// }
// defer pipeline.Stop()
// ... Done
func InstallNewPipeline(config Config) (*push.Controller, error) {
controller, err := NewExportPipeline(config, time.Minute)
func InstallNewPipeline(config Config, opts ...push.Option) (*push.Controller, error) {
controller, err := NewExportPipeline(config, time.Minute, opts...)
if err != nil {
return controller, err
}
Expand All @@ -131,26 +132,27 @@ func InstallNewPipeline(config Config) (*push.Controller, error) {

// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and batchers.
func NewExportPipeline(config Config, period time.Duration) (*push.Controller, error) {
func NewExportPipeline(config Config, period time.Duration, opts ...push.Option) (*push.Controller, error) {
selector := simple.NewWithExactMeasure()
exporter, err := NewRawExporter(config)
if err != nil {
return nil, err
}
batcher := ungrouped.New(selector, true)
pusher := push.New(batcher, exporter, period)
pusher := push.New(batcher, exporter, period, opts...)
pusher.Start()

return pusher, nil
}

func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
func (e *Exporter) Export(_ context.Context, resource *resource.Resource, checkpointSet export.CheckpointSet) error {
var aggError error
var batch expoBatch
if !e.config.DoNotPrintTime {
ts := time.Now()
batch.Timestamp = &ts
}
encodedResource := resource.Encoded(e.config.LabelEncoder)
aggError = checkpointSet.ForEach(func(record export.Record) error {
desc := record.Descriptor()
agg := record.Aggregator()
Expand Down Expand Up @@ -224,8 +226,12 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet)

sb.WriteString(desc.Name())

if len(encodedLabels) > 0 {
if len(encodedLabels) > 0 || len(encodedResource) > 0 {
sb.WriteRune('{')
sb.WriteString(encodedResource)
if len(encodedLabels) > 0 && len(encodedResource) > 0 {
sb.WriteRune(',')
}
sb.WriteString(encodedLabels)
sb.WriteRune('}')
}
Expand Down
Loading

0 comments on commit 3008c1b

Please sign in to comment.