diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b1e158a401..28a2666bc13 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -133,6 +133,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Added codeql worfklow to GitHub Actions (#1428) - Added Gosec workflow to GitHub Actions (#1429) - Add new HTTP driver for OTLP exporter in `exporters/otlp/otlphttp`. Currently it only supports the binary protobuf payloads. (#1420) +- Add an OpenCensus exporter bridge. (#1444) ### Changed diff --git a/bridge/opencensus/README.md b/bridge/opencensus/README.md index 85c3ddbc69d..58ac90c8387 100644 --- a/bridge/opencensus/README.md +++ b/bridge/opencensus/README.md @@ -2,7 +2,9 @@ The OpenCensus Bridge helps facilitate the migration of an application from OpenCensus to OpenTelemetry. -## The Problem: Mixing OpenCensus and OpenTelemetry libraries +## Tracing + +### The Problem: Mixing OpenCensus and OpenTelemetry libraries In a perfect world, one would simply migrate their entire go application --including custom instrumentation, libraries, and exporters-- from OpenCensus to OpenTelemetry all at once. In the real world, dependency constraints, third-party ownership of libraries, or other reasons may require mixing OpenCensus and OpenTelemetry libraries in a single application. @@ -44,10 +46,12 @@ The bridge implements the OpenCensus trace API using OpenTelemetry. This would ### User Journey +Starting from an application using entirely OpenCensus APIs: + 1. Instantiate OpenTelemetry SDK and Exporters 2. Override OpenCensus' DefaultTracer with the bridge -3. Migrate libraries from OpenCensus to OpenTelemetry -4. Remove OpenCensus Exporters +3. Migrate libraries individually from OpenCensus to OpenTelemetry +4. Remove OpenCensus exporters and configuration To override OpenCensus' DefaultTracer with the bridge: ```golang @@ -63,10 +67,56 @@ octrace.DefaultTracer = opencensus.NewTracer(tracer) Be sure to set the `Tracer` name to your instrumentation package name instead of `"bridge"`. -### Incompatibilities +#### Incompatibilities OpenCensus and OpenTelemetry APIs are not entirely compatible. If the bridge finds any incompatibilities, it will log them. Incompatibilities include: * Custom OpenCensus Samplers specified during StartSpan are ignored. * Links cannot be added to OpenCensus spans. * OpenTelemetry Debug or Deferred trace flags are dropped after an OpenCensus span is created. + +## Metrics + +### The problem: mixing libraries without mixing pipelines + +The problem for monitoring is simpler than the problem for tracing, since there +are no context propagation issues to deal with. However, it still is difficult +for users to migrate an entire applications' monitoring at once. It +should be possible to send metrics generated by OpenCensus libraries to an +OpenTelemetry pipeline so that migrating a metric does not require maintaining +separate export pipelines for OpenCensus and OpenTelemetry. + +### The Exporter "wrapper" solution + +The solution we use here is to allow wrapping an OpenTelemetry exporter such +that it implements the OpenCensus exporter interfaces. This allows a single +exporter to be used for metrics from *both* OpenCensus and OpenTelemetry. + +### User Journey + +Starting from an application using entirely OpenCensus APIs: + +1. Instantiate OpenTelemetry SDK and Exporters. +2. Replace OpenCensus exporters with a wrapped OpenTelemetry exporter from step 1. +3. Migrate libraries individually from OpenCensus to OpenTelemetry +4. Remove OpenCensus Exporters and configuration. + +For example, to swap out the OpenCensus logging exporter for the OpenTelemetry stdout exporter: +```golang +import ( + "go.opencensus.io/metric/metricexport" + "go.opentelemetry.io/otel/bridge/opencensus" + "go.opentelemetry.io/otel/exporters/stdout" + "go.opentelemetry.io/otel" +) +// With OpenCensus, you could have previously configured the logging exporter like this: +// import logexporter "go.opencensus.io/examples/exporter" +// exporter, _ := logexporter.NewLogExporter(logexporter.Options{}) +// Instead, we can create an equivalent using the OpenTelemetry stdout exporter: +openTelemetryExporter, _ := stdout.NewExporter(stdout.WithPrettyPrint()) +exporter := opencensus.NewMetricExporter(openTelemetryExporter) + +// Use the wrapped OpenTelemetry exporter like you normally would with OpenCensus +intervalReader, _ := metricexport.NewIntervalReader(&metricexport.Reader{}, exporter) +intervalReader.Start() +``` diff --git a/bridge/opencensus/aggregation.go b/bridge/opencensus/aggregation.go new file mode 100644 index 00000000000..a93f36065e6 --- /dev/null +++ b/bridge/opencensus/aggregation.go @@ -0,0 +1,180 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencensus + +import ( + "errors" + "fmt" + "time" + + "go.opencensus.io/metric/metricdata" + + "go.opentelemetry.io/otel/metric/number" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" +) + +var ( + errIncompatibleType = errors.New("incompatible type for aggregation") + errEmpty = errors.New("points may not be empty") + errBadPoint = errors.New("point cannot be converted") +) + +// aggregationWithEndTime is an aggregation that can also provide the timestamp +// of the last recorded point. +type aggregationWithEndTime interface { + aggregation.Aggregation + end() time.Time +} + +// newAggregationFromPoints creates an OpenTelemetry aggregation from +// OpenCensus points. Points may not be empty and must be either +// all (int|float)64 or all *metricdata.Distribution. +func newAggregationFromPoints(points []metricdata.Point) (aggregationWithEndTime, error) { + if len(points) == 0 { + return nil, errEmpty + } + switch t := points[0].Value.(type) { + case int64: + return newExactAggregator(points) + case float64: + return newExactAggregator(points) + case *metricdata.Distribution: + return newDistributionAggregator(points) + default: + // TODO add *metricdata.Summary support + return nil, fmt.Errorf("%w: %v", errIncompatibleType, t) + } +} + +var _ aggregation.Aggregation = &ocExactAggregator{} +var _ aggregation.LastValue = &ocExactAggregator{} +var _ aggregation.Points = &ocExactAggregator{} + +// newExactAggregator creates an OpenTelemetry aggreation from OpenCensus points. +// Points may not be empty, and must only contain integers or floats. +func newExactAggregator(pts []metricdata.Point) (aggregationWithEndTime, error) { + points := make([]aggregation.Point, len(pts)) + for i, pt := range pts { + switch t := pt.Value.(type) { + case int64: + points[i] = aggregation.Point{ + Number: number.NewInt64Number(pt.Value.(int64)), + Time: pt.Time, + } + case float64: + points[i] = aggregation.Point{ + Number: number.NewFloat64Number(pt.Value.(float64)), + Time: pt.Time, + } + default: + return nil, fmt.Errorf("%w: %v", errIncompatibleType, t) + } + } + return &ocExactAggregator{ + points: points, + }, nil +} + +type ocExactAggregator struct { + points []aggregation.Point +} + +// Kind returns the kind of aggregation this is. +func (o *ocExactAggregator) Kind() aggregation.Kind { + return aggregation.ExactKind +} + +// Points returns access to the raw data set. +func (o *ocExactAggregator) Points() ([]aggregation.Point, error) { + return o.points, nil +} + +// LastValue returns the last point. +func (o *ocExactAggregator) LastValue() (number.Number, time.Time, error) { + last := o.points[len(o.points)-1] + return last.Number, last.Time, nil +} + +// end returns the timestamp of the last point +func (o *ocExactAggregator) end() time.Time { + _, t, _ := o.LastValue() + return t +} + +var _ aggregation.Aggregation = &ocDistAggregator{} +var _ aggregation.Histogram = &ocDistAggregator{} + +// newDistributionAggregator creates an OpenTelemetry aggreation from +// OpenCensus points. Points may not be empty, and must only contain +// Distributions. The most recent disribution will be used in the aggregation. +func newDistributionAggregator(pts []metricdata.Point) (aggregationWithEndTime, error) { + // only use the most recent datapoint for now. + pt := pts[len(pts)-1] + val, ok := pt.Value.(*metricdata.Distribution) + if !ok { + return nil, fmt.Errorf("%w: %v", errBadPoint, pt.Value) + } + bucketCounts := make([]uint64, len(val.Buckets)) + for i, bucket := range val.Buckets { + if bucket.Count < 0 { + return nil, fmt.Errorf("%w: bucket count may not be negative", errBadPoint) + } + bucketCounts[i] = uint64(bucket.Count) + } + if val.Count < 0 { + return nil, fmt.Errorf("%w: count may not be negative", errBadPoint) + } + return &ocDistAggregator{ + sum: number.NewFloat64Number(val.Sum), + count: uint64(val.Count), + buckets: aggregation.Buckets{ + Boundaries: val.BucketOptions.Bounds, + Counts: bucketCounts, + }, + endTime: pts[len(pts)-1].Time, + }, nil +} + +type ocDistAggregator struct { + sum number.Number + count uint64 + buckets aggregation.Buckets + endTime time.Time +} + +// Kind returns the kind of aggregation this is. +func (o *ocDistAggregator) Kind() aggregation.Kind { + return aggregation.HistogramKind +} + +// Sum returns the sum of values. +func (o *ocDistAggregator) Sum() (number.Number, error) { + return o.sum, nil +} + +// Count returns the number of values. +func (o *ocDistAggregator) Count() (uint64, error) { + return o.count, nil +} + +// Histogram returns the count of events in pre-determined buckets. +func (o *ocDistAggregator) Histogram() (aggregation.Buckets, error) { + return o.buckets, nil +} + +// end returns the time the histogram was measured. +func (o *ocDistAggregator) end() time.Time { + return o.endTime +} diff --git a/bridge/opencensus/aggregation_test.go b/bridge/opencensus/aggregation_test.go new file mode 100644 index 00000000000..d6cf8e4944c --- /dev/null +++ b/bridge/opencensus/aggregation_test.go @@ -0,0 +1,341 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencensus + +import ( + "errors" + "testing" + "time" + + "go.opencensus.io/metric/metricdata" + + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" +) + +func TestNewAggregationFromPoints(t *testing.T) { + now := time.Now() + for _, tc := range []struct { + desc string + input []metricdata.Point + expectedKind aggregation.Kind + expectedErr error + }{ + { + desc: "no points", + expectedErr: errEmpty, + }, + { + desc: "int point", + input: []metricdata.Point{ + { + Time: now, + Value: int64(23), + }, + }, + expectedKind: aggregation.ExactKind, + }, + { + desc: "float point", + input: []metricdata.Point{ + { + Time: now, + Value: float64(23), + }, + }, + expectedKind: aggregation.ExactKind, + }, + { + desc: "distribution point", + input: []metricdata.Point{ + { + Time: now, + Value: &metricdata.Distribution{ + Count: 2, + Sum: 55, + BucketOptions: &metricdata.BucketOptions{ + Bounds: []float64{20, 30}, + }, + Buckets: []metricdata.Bucket{ + {Count: 1}, + {Count: 1}, + }, + }, + }, + }, + expectedKind: aggregation.HistogramKind, + }, + { + desc: "bad distribution bucket count", + input: []metricdata.Point{ + { + Time: now, + Value: &metricdata.Distribution{ + Count: 2, + Sum: 55, + BucketOptions: &metricdata.BucketOptions{ + Bounds: []float64{20, 30}, + }, + Buckets: []metricdata.Bucket{ + // negative bucket + {Count: -1}, + {Count: 1}, + }, + }, + }, + }, + expectedErr: errBadPoint, + }, + { + desc: "bad distribution count", + input: []metricdata.Point{ + { + Time: now, + Value: &metricdata.Distribution{ + // negative count + Count: -2, + Sum: 55, + BucketOptions: &metricdata.BucketOptions{ + Bounds: []float64{20, 30}, + }, + Buckets: []metricdata.Bucket{ + {Count: 1}, + {Count: 1}, + }, + }, + }, + }, + expectedErr: errBadPoint, + }, + { + desc: "incompatible point type bool", + input: []metricdata.Point{ + { + Time: now, + Value: true, + }, + }, + expectedErr: errIncompatibleType, + }, + { + desc: "dist is incompatible with exact", + input: []metricdata.Point{ + { + Time: now, + Value: int64(23), + }, + { + Time: now, + Value: &metricdata.Distribution{ + Count: 2, + Sum: 55, + BucketOptions: &metricdata.BucketOptions{ + Bounds: []float64{20, 30}, + }, + Buckets: []metricdata.Bucket{ + {Count: 1}, + {Count: 1}, + }, + }, + }, + }, + expectedErr: errIncompatibleType, + }, + { + desc: "int point is incompatible with dist", + input: []metricdata.Point{ + { + Time: now, + Value: &metricdata.Distribution{ + Count: 2, + Sum: 55, + BucketOptions: &metricdata.BucketOptions{ + Bounds: []float64{20, 30}, + }, + Buckets: []metricdata.Bucket{ + {Count: 1}, + {Count: 1}, + }, + }, + }, + { + Time: now, + Value: int64(23), + }, + }, + expectedErr: errBadPoint, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + output, err := newAggregationFromPoints(tc.input) + if !errors.Is(err, tc.expectedErr) { + t.Errorf("newAggregationFromPoints(%v) = err(%v), want err(%v)", tc.input, err, tc.expectedErr) + } + if tc.expectedErr == nil && output.Kind() != tc.expectedKind { + t.Errorf("newAggregationFromPoints(%v) = %v, want %v", tc.input, output.Kind(), tc.expectedKind) + } + }) + } +} + +func TestPointsAggregation(t *testing.T) { + now := time.Now() + input := []metricdata.Point{ + {Value: int64(15)}, + {Value: int64(-23), Time: now}, + } + output, err := newAggregationFromPoints(input) + if err != nil { + t.Fatalf("newAggregationFromPoints(%v) = err(%v), want ", input, err) + } + if output.Kind() != aggregation.ExactKind { + t.Errorf("newAggregationFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.ExactKind) + } + if output.end() != now { + t.Errorf("newAggregationFromPoints(%v).end() = %v, want %v", input, output.end(), now) + } + pointsAgg, ok := output.(aggregation.Points) + if !ok { + t.Errorf("newAggregationFromPoints(%v) = %v does not implement the aggregation.Points interface", input, output) + } + points, err := pointsAgg.Points() + if err != nil { + t.Fatalf("Unexpected err: %v", err) + } + if len(points) != len(input) { + t.Fatalf("newAggregationFromPoints(%v) resulted in %d points, want %d points", input, len(points), len(input)) + } + for i := range points { + inputPoint := input[i] + outputPoint := points[i] + if inputPoint.Value != outputPoint.AsInt64() { + t.Errorf("newAggregationFromPoints(%v)[%d] = %v, want %v", input, i, outputPoint.AsInt64(), inputPoint.Value) + } + } +} + +func TestLastValueAggregation(t *testing.T) { + now := time.Now() + input := []metricdata.Point{ + {Value: int64(15)}, + {Value: int64(-23), Time: now}, + } + output, err := newAggregationFromPoints(input) + if err != nil { + t.Fatalf("newAggregationFromPoints(%v) = err(%v), want ", input, err) + } + if output.Kind() != aggregation.ExactKind { + t.Errorf("newAggregationFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.ExactKind) + } + if output.end() != now { + t.Errorf("newAggregationFromPoints(%v).end() = %v, want %v", input, output.end(), now) + } + lvAgg, ok := output.(aggregation.LastValue) + if !ok { + t.Errorf("newAggregationFromPoints(%v) = %v does not implement the aggregation.Points interface", input, output) + } + num, endTime, err := lvAgg.LastValue() + if err != nil { + t.Fatalf("Unexpected err: %v", err) + } + if endTime != now { + t.Errorf("newAggregationFromPoints(%v).LastValue() = endTime: %v, want %v", input, endTime, now) + } + if num.AsInt64() != int64(-23) { + t.Errorf("newAggregationFromPoints(%v).LastValue() = number: %v, want %v", input, num.AsInt64(), int64(-23)) + } +} + +func TestHistogramAggregation(t *testing.T) { + now := time.Now() + input := []metricdata.Point{ + { + Value: &metricdata.Distribution{ + Count: 0, + Sum: 0, + BucketOptions: &metricdata.BucketOptions{ + Bounds: []float64{20, 30}, + }, + Buckets: []metricdata.Bucket{ + {Count: 0}, + {Count: 0}, + }, + }, + }, + { + Time: now, + Value: &metricdata.Distribution{ + Count: 2, + Sum: 55, + BucketOptions: &metricdata.BucketOptions{ + Bounds: []float64{20, 30}, + }, + Buckets: []metricdata.Bucket{ + {Count: 1}, + {Count: 1}, + }, + }, + }, + } + output, err := newAggregationFromPoints(input) + if err != nil { + t.Fatalf("newAggregationFromPoints(%v) = err(%v), want ", input, err) + } + if output.Kind() != aggregation.HistogramKind { + t.Errorf("newAggregationFromPoints(%v) = %v, want %v", input, output.Kind(), aggregation.HistogramKind) + } + if output.end() != now { + t.Errorf("newAggregationFromPoints(%v).end() = %v, want %v", input, output.end(), now) + } + distAgg, ok := output.(aggregation.Histogram) + if !ok { + t.Errorf("newAggregationFromPoints(%v) = %v does not implement the aggregation.Points interface", input, output) + } + sum, err := distAgg.Sum() + if err != nil { + t.Fatalf("Unexpected err: %v", err) + } + if sum.AsFloat64() != float64(55) { + t.Errorf("newAggregationFromPoints(%v).Sum() = %v, want %v", input, sum.AsFloat64(), float64(55)) + } + count, err := distAgg.Count() + if err != nil { + t.Fatalf("Unexpected err: %v", err) + } + if count != 2 { + t.Errorf("newAggregationFromPoints(%v).Count() = %v, want %v", input, count, 2) + } + hist, err := distAgg.Histogram() + if err != nil { + t.Fatalf("Unexpected err: %v", err) + } + inputBucketBoundaries := []float64{20, 30} + if len(hist.Boundaries) != len(inputBucketBoundaries) { + t.Fatalf("newAggregationFromPoints(%v).Histogram() produced %d boundaries, want %d boundaries", input, len(hist.Boundaries), len(inputBucketBoundaries)) + } + for i, b := range hist.Boundaries { + if b != inputBucketBoundaries[i] { + t.Errorf("newAggregationFromPoints(%v).Histogram().Boundaries[%d] = %v, want %v", input, i, b, inputBucketBoundaries[i]) + } + } + inputBucketCounts := []uint64{1, 1} + if len(hist.Counts) != len(inputBucketCounts) { + t.Fatalf("newAggregationFromPoints(%v).Histogram() produced %d buckets, want %d buckets", input, len(hist.Counts), len(inputBucketCounts)) + } + for i, c := range hist.Counts { + if c != inputBucketCounts[i] { + t.Errorf("newAggregationFromPoints(%v).Histogram().Counts[%d] = %d, want %d", input, i, c, inputBucketCounts[i]) + } + } +} diff --git a/bridge/opencensus/exporter.go b/bridge/opencensus/exporter.go new file mode 100644 index 00000000000..c73ea891c68 --- /dev/null +++ b/bridge/opencensus/exporter.go @@ -0,0 +1,168 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencensus + +import ( + "context" + "errors" + "fmt" + "sync" + + "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricexport" + ocresource "go.opencensus.io/resource" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/number" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/unit" +) + +var errConversion = errors.New("Unable to convert from OpenCensus to OpenTelemetry") + +// NewMetricExporter returns an OpenCensus exporter that exports to an +// OpenTelemetry exporter +func NewMetricExporter(base export.Exporter) metricexport.Exporter { + return &exporter{base: base} +} + +// exporter implements the OpenCensus metric Exporter interface using an +// OpenTelemetry base exporter. +type exporter struct { + base export.Exporter +} + +// ExportMetrics implements the OpenCensus metric Exporter interface +func (e *exporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error { + return e.base.Export(ctx, &checkpointSet{metrics: metrics}) +} + +type checkpointSet struct { + // RWMutex implements locking for the `CheckpointSet` interface. + sync.RWMutex + metrics []*metricdata.Metric +} + +// ForEach iterates through the CheckpointSet, passing an +// export.Record with the appropriate aggregation to an exporter. +func (d *checkpointSet) ForEach(exporter export.ExportKindSelector, f func(export.Record) error) error { + for _, m := range d.metrics { + descriptor, err := convertDescriptor(m.Descriptor) + if err != nil { + otel.Handle(err) + continue + } + res := convertResource(m.Resource) + for _, ts := range m.TimeSeries { + if len(ts.Points) == 0 { + continue + } + ls, err := convertLabels(m.Descriptor.LabelKeys, ts.LabelValues) + if err != nil { + otel.Handle(err) + continue + } + agg, err := newAggregationFromPoints(ts.Points) + if err != nil { + otel.Handle(err) + continue + } + if err := f(export.NewRecord( + &descriptor, + &ls, + res, + agg, + ts.StartTime, + agg.end(), + )); err != nil && !errors.Is(err, aggregation.ErrNoData) { + return err + } + } + } + return nil +} + +// convertLabels converts from OpenCensus label keys and values to an +// OpenTelemetry label Set. +func convertLabels(keys []metricdata.LabelKey, values []metricdata.LabelValue) (attribute.Set, error) { + if len(keys) != len(values) { + return attribute.NewSet(), fmt.Errorf("%w different number of label keys (%d) and values (%d)", errConversion, len(keys), len(values)) + } + labels := []attribute.KeyValue{} + for i, lv := range values { + if !lv.Present { + continue + } + labels = append(labels, attribute.KeyValue{ + Key: attribute.Key(keys[i].Key), + Value: attribute.StringValue(lv.Value), + }) + } + return attribute.NewSet(labels...), nil +} + +// convertResource converts an OpenCensus Resource to an OpenTelemetry Resource +func convertResource(res *ocresource.Resource) *resource.Resource { + labels := []attribute.KeyValue{} + if res == nil { + return nil + } + for k, v := range res.Labels { + labels = append(labels, attribute.KeyValue{Key: attribute.Key(k), Value: attribute.StringValue(v)}) + } + return resource.NewWithAttributes(labels...) +} + +// convertDescriptor converts an OpenCensus Descriptor to an OpenTelemetry Descriptor +func convertDescriptor(ocDescriptor metricdata.Descriptor) (metric.Descriptor, error) { + var ( + nkind number.Kind + ikind metric.InstrumentKind + ) + switch ocDescriptor.Type { + case metricdata.TypeGaugeInt64: + nkind = number.Int64Kind + ikind = metric.ValueObserverInstrumentKind + case metricdata.TypeGaugeFloat64: + nkind = number.Float64Kind + ikind = metric.ValueObserverInstrumentKind + case metricdata.TypeCumulativeInt64: + nkind = number.Int64Kind + ikind = metric.SumObserverInstrumentKind + case metricdata.TypeCumulativeFloat64: + nkind = number.Float64Kind + ikind = metric.SumObserverInstrumentKind + default: + // Includes TypeGaugeDistribution, TypeCumulativeDistribution, TypeSummary + return metric.Descriptor{}, fmt.Errorf("%w; descriptor type: %v", errConversion, ocDescriptor.Type) + } + opts := []metric.InstrumentOption{ + metric.WithDescription(ocDescriptor.Description), + metric.WithInstrumentationName("OpenCensus Bridge"), + } + switch ocDescriptor.Unit { + case metricdata.UnitDimensionless: + opts = append(opts, metric.WithUnit(unit.Dimensionless)) + case metricdata.UnitBytes: + opts = append(opts, metric.WithUnit(unit.Bytes)) + case metricdata.UnitMilliseconds: + opts = append(opts, metric.WithUnit(unit.Milliseconds)) + } + return metric.NewDescriptor(ocDescriptor.Name, ikind, nkind, opts...), nil +} diff --git a/bridge/opencensus/exporter_test.go b/bridge/opencensus/exporter_test.go new file mode 100644 index 00000000000..c15ba4b6812 --- /dev/null +++ b/bridge/opencensus/exporter_test.go @@ -0,0 +1,480 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencensus + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "go.opentelemetry.io/otel" + + "go.opencensus.io/metric/metricdata" + ocresource "go.opencensus.io/resource" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/number" + export "go.opentelemetry.io/otel/sdk/export/metric" + exportmetric "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/unit" +) + +type fakeExporter struct { + export.Exporter + records []export.Record + err error +} + +func (f *fakeExporter) Export(ctx context.Context, cps exportmetric.CheckpointSet) error { + return cps.ForEach(f, func(record exportmetric.Record) error { + f.records = append(f.records, record) + return f.err + }) +} + +type fakeErrorHandler struct { + err error +} + +func (f *fakeErrorHandler) Handle(err error) { + f.err = err +} + +func (f *fakeErrorHandler) matches(err error) error { + // make sure err is cleared for the next test + defer func() { f.err = nil }() + if !errors.Is(f.err, err) { + return fmt.Errorf("err(%v), want err(%v)", f.err, err) + } + return nil +} + +func TestExportMetrics(t *testing.T) { + now := time.Now() + basicDesc := metric.NewDescriptor( + "", + metric.ValueObserverInstrumentKind, + number.Int64Kind, + metric.WithInstrumentationName("OpenCensus Bridge"), + ) + fakeErrorHandler := &fakeErrorHandler{} + otel.SetErrorHandler(fakeErrorHandler) + for _, tc := range []struct { + desc string + input []*metricdata.Metric + exportErr error + expected []export.Record + expectedHandledError error + }{ + { + desc: "no metrics", + }, + { + desc: "metric without points is dropped", + input: []*metricdata.Metric{ + { + TimeSeries: []*metricdata.TimeSeries{ + {}, + }, + }, + }, + }, + { + desc: "descriptor conversion error", + input: []*metricdata.Metric{ + // TypeGaugeDistribution isn't supported + {Descriptor: metricdata.Descriptor{Type: metricdata.TypeGaugeDistribution}}, + }, + expectedHandledError: errConversion, + }, + { + desc: "labels conversion error", + input: []*metricdata.Metric{ + { + // No descriptor with label keys. + TimeSeries: []*metricdata.TimeSeries{ + // 1 label value, which doens't exist in keys. + { + LabelValues: []metricdata.LabelValue{{Value: "foo", Present: true}}, + Points: []metricdata.Point{ + {}, + }, + }, + }, + }, + }, + expectedHandledError: errConversion, + }, + { + desc: "unsupported summary point type", + input: []*metricdata.Metric{ + { + TimeSeries: []*metricdata.TimeSeries{ + { + Points: []metricdata.Point{ + {Value: &metricdata.Summary{}}, + }, + }, + }, + }, + }, + expectedHandledError: errIncompatibleType, + }, + { + desc: "success", + input: []*metricdata.Metric{ + { + TimeSeries: []*metricdata.TimeSeries{ + { + StartTime: now, + Points: []metricdata.Point{ + {Value: int64(123), Time: now}, + }, + }, + }, + }, + }, + expected: []export.Record{ + export.NewRecord( + &basicDesc, + attribute.EmptySet(), + resource.NewWithAttributes(), + &ocExactAggregator{ + points: []aggregation.Point{ + { + Number: number.NewInt64Number(123), + Time: now, + }, + }, + }, + now, + now, + ), + }, + }, + { + desc: "export error after success", + input: []*metricdata.Metric{ + { + TimeSeries: []*metricdata.TimeSeries{ + { + StartTime: now, + Points: []metricdata.Point{ + {Value: int64(123), Time: now}, + }, + }, + }, + }, + }, + expected: []export.Record{ + export.NewRecord( + &basicDesc, + attribute.EmptySet(), + resource.NewWithAttributes(), + &ocExactAggregator{ + points: []aggregation.Point{ + { + Number: number.NewInt64Number(123), + Time: now, + }, + }, + }, + now, + now, + ), + }, + exportErr: errors.New("failed to export"), + }, + { + desc: "partial success sends correct metrics and drops incorrect metrics with handled err", + input: []*metricdata.Metric{ + { + TimeSeries: []*metricdata.TimeSeries{ + { + StartTime: now, + Points: []metricdata.Point{ + {Value: int64(123), Time: now}, + }, + }, + }, + }, + // TypeGaugeDistribution isn't supported + {Descriptor: metricdata.Descriptor{Type: metricdata.TypeGaugeDistribution}}, + }, + expected: []export.Record{ + export.NewRecord( + &basicDesc, + attribute.EmptySet(), + resource.NewWithAttributes(), + &ocExactAggregator{ + points: []aggregation.Point{ + { + Number: number.NewInt64Number(123), + Time: now, + }, + }, + }, + now, + now, + ), + }, + expectedHandledError: errConversion, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + fakeExporter := &fakeExporter{err: tc.exportErr} + err := NewMetricExporter(fakeExporter).ExportMetrics(context.Background(), tc.input) + if !errors.Is(err, tc.exportErr) { + t.Errorf("NewMetricExporter(%+v) = err(%v), want err(%v)", tc.input, err, tc.exportErr) + } + // Check the global error handler, since we don't return errors + // which occur during conversion. + err = fakeErrorHandler.matches(tc.expectedHandledError) + if err != nil { + t.Fatalf("ExportMetrics(%+v) = %v", tc.input, err) + } + output := fakeExporter.records + if len(tc.expected) != len(output) { + t.Fatalf("ExportMetrics(%+v) = %d records, want %d records", tc.input, len(output), len(tc.expected)) + } + for i, expected := range tc.expected { + if output[i].StartTime() != expected.StartTime() { + t.Errorf("ExportMetrics(%+v)[i].StartTime() = %+v, want %+v", tc.input, output[i].StartTime(), expected.StartTime()) + } + if output[i].EndTime() != expected.EndTime() { + t.Errorf("ExportMetrics(%+v)[i].EndTime() = %+v, want %+v", tc.input, output[i].EndTime(), expected.EndTime()) + } + if output[i].Resource().String() != expected.Resource().String() { + t.Errorf("ExportMetrics(%+v)[i].Resource() = %+v, want %+v", tc.input, output[i].Resource().String(), expected.Resource().String()) + } + if output[i].Descriptor().Name() != expected.Descriptor().Name() { + t.Errorf("ExportMetrics(%+v)[i].Descriptor() = %+v, want %+v", tc.input, output[i].Descriptor().Name(), expected.Descriptor().Name()) + } + // Don't bother with a complete check of the descriptor. + // That is checked as part of descriptor conversion tests below. + if !output[i].Labels().Equals(expected.Labels()) { + t.Errorf("ExportMetrics(%+v)[i].Labels() = %+v, want %+v", tc.input, output[i].Labels(), expected.Labels()) + } + if output[i].Aggregation().Kind() != expected.Aggregation().Kind() { + t.Errorf("ExportMetrics(%+v)[i].Aggregation() = %+v, want %+v", tc.input, output[i].Aggregation().Kind(), expected.Aggregation().Kind()) + } + // Don't bother checking the contents of the points aggregation. + // Those tests are done with the aggregations themselves + } + }) + } +} + +func TestConvertLabels(t *testing.T) { + setWithMultipleKeys := attribute.NewSet( + attribute.KeyValue{Key: attribute.Key("first"), Value: attribute.StringValue("1")}, + attribute.KeyValue{Key: attribute.Key("second"), Value: attribute.StringValue("2")}, + ) + for _, tc := range []struct { + desc string + inputKeys []metricdata.LabelKey + inputValues []metricdata.LabelValue + expected *attribute.Set + expectedErr error + }{ + { + desc: "no labels", + expected: attribute.EmptySet(), + }, + { + desc: "different numbers of keys and values", + inputKeys: []metricdata.LabelKey{{Key: "foo"}}, + expected: attribute.EmptySet(), + expectedErr: errConversion, + }, + { + desc: "multiple keys and values", + inputKeys: []metricdata.LabelKey{{Key: "first"}, {Key: "second"}}, + inputValues: []metricdata.LabelValue{ + {Value: "1", Present: true}, + {Value: "2", Present: true}, + }, + expected: &setWithMultipleKeys, + }, + { + desc: "multiple keys and values with some not present", + inputKeys: []metricdata.LabelKey{{Key: "first"}, {Key: "second"}, {Key: "third"}}, + inputValues: []metricdata.LabelValue{ + {Value: "1", Present: true}, + {Value: "2", Present: true}, + {Present: false}, + }, + expected: &setWithMultipleKeys, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + output, err := convertLabels(tc.inputKeys, tc.inputValues) + if !errors.Is(err, tc.expectedErr) { + t.Errorf("convertLabels(keys: %v, values: %v) = err(%v), want err(%v)", tc.inputKeys, tc.inputValues, err, tc.expectedErr) + } + if !output.Equals(tc.expected) { + t.Errorf("convertLabels(keys: %v, values: %v) = %+v, want %+v", tc.inputKeys, tc.inputValues, output.ToSlice(), tc.expected.ToSlice()) + } + }) + } +} +func TestConvertResource(t *testing.T) { + for _, tc := range []struct { + desc string + input *ocresource.Resource + expected *resource.Resource + }{ + { + desc: "nil resource", + }, + { + desc: "empty resource", + input: &ocresource.Resource{ + Labels: map[string]string{}, + }, + expected: resource.NewWithAttributes(), + }, + { + desc: "resource with labels", + input: &ocresource.Resource{ + Labels: map[string]string{ + "foo": "bar", + "tick": "tock", + }, + }, + expected: resource.NewWithAttributes( + attribute.KeyValue{Key: attribute.Key("foo"), Value: attribute.StringValue("bar")}, + attribute.KeyValue{Key: attribute.Key("tick"), Value: attribute.StringValue("tock")}, + ), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + output := convertResource(tc.input) + if !output.Equal(tc.expected) { + t.Errorf("convertResource(%v) = %+v, want %+v", tc.input, output, tc.expected) + } + }) + } +} +func TestConvertDescriptor(t *testing.T) { + for _, tc := range []struct { + desc string + input metricdata.Descriptor + expected metric.Descriptor + expectedErr error + }{ + { + desc: "empty descriptor", + expected: metric.NewDescriptor( + "", + metric.ValueObserverInstrumentKind, + number.Int64Kind, + metric.WithInstrumentationName("OpenCensus Bridge"), + ), + }, + { + desc: "gauge int64 bytes", + input: metricdata.Descriptor{ + Name: "foo", + Description: "bar", + Unit: metricdata.UnitBytes, + Type: metricdata.TypeGaugeInt64, + }, + expected: metric.NewDescriptor( + "foo", + metric.ValueObserverInstrumentKind, + number.Int64Kind, + metric.WithInstrumentationName("OpenCensus Bridge"), + metric.WithDescription("bar"), + metric.WithUnit(unit.Bytes), + ), + }, + { + desc: "gauge float64 ms", + input: metricdata.Descriptor{ + Name: "foo", + Description: "bar", + Unit: metricdata.UnitMilliseconds, + Type: metricdata.TypeGaugeFloat64, + }, + expected: metric.NewDescriptor( + "foo", + metric.ValueObserverInstrumentKind, + number.Float64Kind, + metric.WithInstrumentationName("OpenCensus Bridge"), + metric.WithDescription("bar"), + metric.WithUnit(unit.Milliseconds), + ), + }, + { + desc: "cumulative int64 dimensionless", + input: metricdata.Descriptor{ + Name: "foo", + Description: "bar", + Unit: metricdata.UnitDimensionless, + Type: metricdata.TypeCumulativeInt64, + }, + expected: metric.NewDescriptor( + "foo", + metric.SumObserverInstrumentKind, + number.Int64Kind, + metric.WithInstrumentationName("OpenCensus Bridge"), + metric.WithDescription("bar"), + metric.WithUnit(unit.Dimensionless), + ), + }, + { + desc: "cumulative float64 dimensionless", + input: metricdata.Descriptor{ + Name: "foo", + Description: "bar", + Unit: metricdata.UnitDimensionless, + Type: metricdata.TypeCumulativeFloat64, + }, + expected: metric.NewDescriptor( + "foo", + metric.SumObserverInstrumentKind, + number.Float64Kind, + metric.WithInstrumentationName("OpenCensus Bridge"), + metric.WithDescription("bar"), + metric.WithUnit(unit.Dimensionless), + ), + }, + { + desc: "incompatible TypeCumulativeDistribution", + input: metricdata.Descriptor{ + Name: "foo", + Description: "bar", + Type: metricdata.TypeCumulativeDistribution, + }, + expectedErr: errConversion, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + output, err := convertDescriptor(tc.input) + if !errors.Is(err, tc.expectedErr) { + t.Errorf("convertDescriptor(%v) = err(%v), want err(%v)", tc.input, err, tc.expectedErr) + } + if output != tc.expected { + t.Errorf("convertDescriptor(%v) = %+v, want %+v", tc.input, output, tc.expected) + } + }) + } +} diff --git a/bridge/opencensus/go.mod b/bridge/opencensus/go.mod index 12bc47be23a..22a2bc5ddda 100644 --- a/bridge/opencensus/go.mod +++ b/bridge/opencensus/go.mod @@ -5,7 +5,10 @@ go 1.14 require ( go.opencensus.io v0.22.6-0.20201102222123-380f4078db9f go.opentelemetry.io/otel v0.18.0 + go.opentelemetry.io/otel/metric v0.18.0 go.opentelemetry.io/otel/oteltest v0.18.0 + go.opentelemetry.io/otel/sdk v0.18.0 + go.opentelemetry.io/otel/sdk/export/metric v0.0.0-00010101000000-000000000000 go.opentelemetry.io/otel/trace v0.18.0 ) diff --git a/bridge/opencensus/go.sum b/bridge/opencensus/go.sum index f80834326d5..41a6cf3bccb 100644 --- a/bridge/opencensus/go.sum +++ b/bridge/opencensus/go.sum @@ -8,6 +8,7 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 h1:ZgQEtGgCBiWRM github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= diff --git a/example/opencensus/go.mod b/example/opencensus/go.mod index 060edde6a60..a6d84831786 100644 --- a/example/opencensus/go.mod +++ b/example/opencensus/go.mod @@ -15,6 +15,7 @@ require ( go.opentelemetry.io/otel/bridge/opencensus v0.18.0 go.opentelemetry.io/otel/exporters/stdout v0.18.0 go.opentelemetry.io/otel/sdk v0.18.0 + go.opentelemetry.io/otel/sdk/export/metric v0.18.0 ) replace go.opentelemetry.io/otel/bridge/opentracing => ../../bridge/opentracing diff --git a/example/opencensus/main.go b/example/opencensus/main.go index b11f1340afe..258bcdc28bb 100644 --- a/example/opencensus/main.go +++ b/example/opencensus/main.go @@ -17,26 +17,58 @@ package main import ( "context" "log" + "time" + "go.opencensus.io/metric/metricdata" + + "go.opencensus.io/metric" + "go.opencensus.io/metric/metricexport" + "go.opencensus.io/metric/metricproducer" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" octrace "go.opencensus.io/trace" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/bridge/opencensus" "go.opentelemetry.io/otel/exporters/stdout" + otmetricexport "go.opentelemetry.io/otel/sdk/export/metric" + ottraceexport "go.opentelemetry.io/otel/sdk/export/trace" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) +var ( + // instrumenttype differentiates between our gauge and view metrics. + keyType = tag.MustNewKey("instrumenttype") + // Counts the number of lines read in from standard input + countMeasure = stats.Int64("test_count", "A count of something", stats.UnitDimensionless) + countView = &view.View{ + Name: "test_count", + Measure: countMeasure, + Description: "A count of something", + Aggregation: view.Count(), + TagKeys: []tag.Key{keyType}, + } +) + func main() { + log.Println("Using OpenTelemetry stdout exporter.") + otExporter, err := stdout.NewExporter(stdout.WithPrettyPrint()) + if err != nil { + log.Fatal(err) + } + tracing(otExporter) + monitoring(otExporter) +} + +// tracing demonstrates overriding the OpenCensus DefaultTracer to send spans +// to the OpenTelemetry exporter by calling OpenCensus APIs. +func tracing(otExporter ottraceexport.SpanExporter) { ctx := context.Background() log.Println("Configuring OpenCensus. Not Registering any OpenCensus exporters.") octrace.ApplyConfig(octrace.Config{DefaultSampler: octrace.AlwaysSample()}) - log.Println("Registering OpenTelemetry stdout exporter.") - otExporter, err := stdout.NewExporter(stdout.WithPrettyPrint()) - if err != nil { - log.Fatal(err) - } tp := sdktrace.NewTracerProvider(sdktrace.WithSyncer(otExporter)) otel.SetTracerProvider(tp) @@ -56,3 +88,56 @@ func main() { _, innerOCSpan := octrace.StartSpan(ctx, "OpenCensusInnerSpan") innerOCSpan.End() } + +// monitoring demonstrates creating an IntervalReader using the OpenTelemetry +// exporter to send metrics to the exporter by using either an OpenCensus +// registry or an OpenCensus view. +func monitoring(otExporter otmetricexport.Exporter) { + log.Println("Using the OpenTelemetry stdout exporter to export OpenCensus metrics. This allows routing telemetry from both OpenTelemetry and OpenCensus to a single exporter.") + ocExporter := opencensus.NewMetricExporter(otExporter) + intervalReader, err := metricexport.NewIntervalReader(&metricexport.Reader{}, ocExporter) + if err != nil { + log.Fatalf("Failed to create interval reader: %v\n", err) + } + intervalReader.ReportingInterval = 10 * time.Second + log.Println("Emitting metrics using OpenCensus APIs. These should be printed out using the OpenTelemetry stdout exporter.") + err = intervalReader.Start() + if err != nil { + log.Fatalf("Failed to start interval reader: %v\n", err) + } + defer intervalReader.Stop() + + log.Println("Registering a gauge metric using an OpenCensus registry.") + r := metric.NewRegistry() + metricproducer.GlobalManager().AddProducer(r) + gauge, err := r.AddInt64Gauge( + "test_gauge", + metric.WithDescription("A gauge for testing"), + metric.WithConstLabel(map[metricdata.LabelKey]metricdata.LabelValue{ + {Key: keyType.Name()}: metricdata.NewLabelValue("gauge"), + }), + ) + if err != nil { + log.Fatalf("Failed to add gauge: %v\n", err) + } + entry, err := gauge.GetEntry() + if err != nil { + log.Fatalf("Failed to get gauge entry: %v\n", err) + } + + log.Println("Registering a cumulative metric using an OpenCensus view.") + if err := view.Register(countView); err != nil { + log.Fatalf("Failed to register views: %v", err) + } + ctx, err := tag.New(context.Background(), tag.Insert(keyType, "view")) + if err != nil { + log.Fatalf("Failed to set tag: %v\n", err) + } + for i := int64(1); true; i++ { + // update stats for our gauge + entry.Set(i) + // update stats for our view + stats.Record(ctx, countMeasure.M(1)) + time.Sleep(time.Second) + } +}