diff --git a/sdk/metric/controller/basic/controller.go b/sdk/metric/controller/basic/controller.go index 5579751be46..da863fcaab3 100644 --- a/sdk/metric/controller/basic/controller.go +++ b/sdk/metric/controller/basic/controller.go @@ -95,6 +95,11 @@ func (c *Controller) Meter(instrumentationName string, opts ...metric.MeterOptio newTmp := &initMeterOnce{} m, _ := c.libraries.LoadOrStore(library, newTmp) mo := m.(*initMeterOnce) + + return metric.WrapMeterImpl(c.initializeUniqueMeter(library, mo)) +} + +func (c *Controller) initializeUniqueMeter(library instrumentation.Library, mo *initMeterOnce) *registry.UniqueInstrumentMeterImpl { mo.initOnce.Do(func() { checkpointer := c.checkpointerFactory.NewCheckpointer() accumulator := sdk.NewAccumulator(checkpointer) @@ -104,8 +109,19 @@ func (c *Controller) Meter(instrumentationName string, opts ...metric.MeterOptio library: library, }) }) + return mo.unique +} - return metric.WrapMeterImpl(mo.unique) +// syncMapKeyValueToAccuulatorCheckpointer encapsulates the invariants +// placed on the libraries sync.Map, which is a +// map[instrumentation.Library]*initMeterOnce where the +// registry.UniqueInstrumentMeter's implementation is a +// *accumulatorCheckpointer. +func (c *Controller) syncMapKeyValueToAccumulatorCheckpointer(key, value interface{}) *accumulatorCheckpointer { + return c.initializeUniqueMeter( + key.(instrumentation.Library), + value.(*initMeterOnce), + ).MeterImpl().(*accumulatorCheckpointer) } type accumulatorCheckpointer struct { @@ -245,35 +261,27 @@ func (c *Controller) collect(ctx context.Context) error { return c.export(ctx) } -// accumulatorList returns a snapshot of current accumulators -// registered to this controller. This briefly locks the controller. -func (c *Controller) accumulatorList() []*accumulatorCheckpointer { - c.lock.Lock() - defer c.lock.Unlock() - - var r []*accumulatorCheckpointer - c.libraries.Range(func(_, value interface{}) bool { - mo := value.(*initMeterOnce) - acc, ok := mo.unique.MeterImpl().(*accumulatorCheckpointer) - if ok { - r = append(r, acc) - } - return true - }) - return r -} - // checkpoint calls the Accumulator and Checkpointer interfaces to // compute the Reader. This applies the configured collection // timeout. Note that this does not try to cancel a Collect or Export // when Stop() is called. func (c *Controller) checkpoint(ctx context.Context) error { - for _, impl := range c.accumulatorList() { - if err := c.checkpointSingleAccumulator(ctx, impl); err != nil { - return err + var errs []error + c.libraries.Range(func(key, value interface{}) bool { + acPair := c.syncMapKeyValueToAccumulatorCheckpointer(key, value) + + if err := c.checkpointSingleAccumulator(ctx, acPair); err != nil { + errs = append(errs, err) } + return false + }) + if errs == nil { + return nil } - return nil + if len(errs) == 1 { + return errs[0] + } + return fmt.Errorf("multiple checkpoint errors %w %v", errs[0], errs[1:]) } // checkpointSingleAccumulator checkpoints a single instrumentation @@ -329,18 +337,26 @@ func (c *Controller) export(ctx context.Context) error { // ForEach implements export.InstrumentationLibraryReader. func (c *Controller) ForEach(readerFunc func(l instrumentation.Library, r export.Reader) error) error { - for _, acPair := range c.accumulatorList() { + var errs []error + c.libraries.Range(func(key, value interface{}) bool { + acPair := c.syncMapKeyValueToAccumulatorCheckpointer(key, value) reader := acPair.checkpointer.Reader() - // TODO: We should not fail fast; instead accumulate errors. if err := func() error { reader.RLock() defer reader.RUnlock() return readerFunc(acPair.library, reader) }(); err != nil { - return err + errs = append(errs, err) } + return false + }) + if errs == nil { + return nil } - return nil + if len(errs) == 1 { + return errs[0] + } + return fmt.Errorf("multiple ForEach errors %w %v", errs[0], errs[1:]) } // IsRunning returns true if the controller was started via Start(),