Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Another batch of cleanups in otlp exporter #1357

Merged
merged 13 commits into from
Nov 24, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- Move the OpenCensus example into `example` directory. (#1359)
- `NewExporter` and `Start` functions in `go.opentelemetry.io/otel/exporters/otlp` now receive `context.Context` as a first parameter. (#1357)

## [0.14.0] - 2020-11-19

Expand Down
2 changes: 1 addition & 1 deletion example/otel-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func initProvider() func() {
// `localhost:30080` address. Otherwise, replace `localhost` with the
// address of your cluster. If you run the app inside k8s, then you can
// probably connect directly to the service through dns
exp, err := otlp.NewExporter(
exp, err := otlp.NewExporter(ctx,
otlp.WithInsecure(),
otlp.WithAddress("localhost:30080"),
otlp.WithGRPCDialOption(grpc.WithBlock()), // useful for testing
Expand Down
4 changes: 2 additions & 2 deletions exporters/otlp/alignment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{
{
Name: "Exporter.lastConnectErrPtr",
Offset: unsafe.Offsetof(Exporter{}.lastConnectErrPtr),
Name: "grpcConnection.lastConnectErrPtr",
Offset: unsafe.Offsetof(grpcConnection{}.lastConnectErrPtr),
},
}
if !ottest.Aligned8Byte(fields, os.Stderr) {
Expand Down
209 changes: 186 additions & 23 deletions exporters/otlp/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,52 +15,113 @@
package otlp // import "go.opentelemetry.io/otel/exporters/otlp"

import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"unsafe"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

func (e *Exporter) lastConnectError() error {
errPtr := (*error)(atomic.LoadPointer(&e.lastConnectErrPtr))
type grpcConnection struct {
// Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines.
lastConnectErrPtr unsafe.Pointer

// mu protects the connection as it is accessed by the
// exporter goroutines and background connection goroutine
mu sync.Mutex
cc *grpc.ClientConn

// these fields are read-only after constructor is finished
c config
metadata metadata.MD
newConnectionHandler func(cc *grpc.ClientConn) error

// these channels are created once
disconnectedCh chan bool
backgroundConnectionDoneCh chan struct{}
stopCh chan struct{}

// this is for tests, so they can replace the closing
// routine without a worry of modifying some global variable
// or changing it back to original after the test is done
closeBackgroundConnectionDoneCh func(ch chan struct{})
}

func newGRPCConnection(c config, handler func(cc *grpc.ClientConn) error) *grpcConnection {
conn := new(grpcConnection)
conn.newConnectionHandler = handler
if c.collectorAddr == "" {
c.collectorAddr = fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort)
}
conn.c = c
if len(conn.c.headers) > 0 {
conn.metadata = metadata.New(conn.c.headers)
}
conn.closeBackgroundConnectionDoneCh = func(ch chan struct{}) {
close(ch)
}
return conn
}

func (oc *grpcConnection) startConnection(ctx context.Context) {
oc.stopCh = make(chan struct{})
oc.disconnectedCh = make(chan bool)
oc.backgroundConnectionDoneCh = make(chan struct{})

if err := oc.connect(ctx); err == nil {
oc.setStateConnected()
} else {
oc.setStateDisconnected(err)
}
go oc.indefiniteBackgroundConnection()
}

func (oc *grpcConnection) lastConnectError() error {
errPtr := (*error)(atomic.LoadPointer(&oc.lastConnectErrPtr))
if errPtr == nil {
return nil
}
return *errPtr
}

func (e *Exporter) saveLastConnectError(err error) {
func (oc *grpcConnection) saveLastConnectError(err error) {
var errPtr *error
if err != nil {
errPtr = &err
}
atomic.StorePointer(&e.lastConnectErrPtr, unsafe.Pointer(errPtr))
atomic.StorePointer(&oc.lastConnectErrPtr, unsafe.Pointer(errPtr))
}

func (e *Exporter) setStateDisconnected(err error) {
e.saveLastConnectError(err)
func (oc *grpcConnection) setStateDisconnected(err error) {
oc.saveLastConnectError(err)
select {
case e.disconnectedCh <- true:
case oc.disconnectedCh <- true:
default:
}
_ = oc.newConnectionHandler(nil)
}

func (e *Exporter) setStateConnected() {
e.saveLastConnectError(nil)
func (oc *grpcConnection) setStateConnected() {
oc.saveLastConnectError(nil)
}

func (e *Exporter) connected() bool {
return e.lastConnectError() == nil
func (oc *grpcConnection) connected() bool {
return oc.lastConnectError() == nil
}

const defaultConnReattemptPeriod = 10 * time.Second

func (e *Exporter) indefiniteBackgroundConnection() {
func (oc *grpcConnection) indefiniteBackgroundConnection() {
defer func() {
e.backgroundConnectionDoneCh <- true
oc.closeBackgroundConnectionDoneCh(oc.backgroundConnectionDoneCh)
}()

connReattemptPeriod := e.c.reconnectionPeriod
connReattemptPeriod := oc.c.reconnectionPeriod
if connReattemptPeriod <= 0 {
connReattemptPeriod = defaultConnReattemptPeriod
}
Expand All @@ -79,35 +140,137 @@ func (e *Exporter) indefiniteBackgroundConnection() {
// 2. Otherwise block until we are disconnected, and
// then retry connecting
select {
case <-e.stopCh:
case <-oc.stopCh:
return

case <-e.disconnectedCh:
case <-oc.disconnectedCh:
// Quickly check if we haven't stopped at the
// same time.
select {
case <-oc.stopCh:
return

default:
}

// Normal scenario that we'll wait for
}

if err := e.connect(); err == nil {
e.setStateConnected()
if err := oc.connect(context.Background()); err == nil {
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
oc.setStateConnected()
} else {
e.setStateDisconnected(err)
oc.setStateDisconnected(err)
}

// Apply some jitter to avoid lockstep retrials of other
// collector-exporters. Lockstep retrials could result in an
// innocent DDOS, by clogging the machine's resources and network.
jitter := time.Duration(rng.Int63n(maxJitterNanos))
select {
case <-e.stopCh:
case <-oc.stopCh:
return
case <-time.After(connReattemptPeriod + jitter):
}
}
}

func (e *Exporter) connect() error {
cc, err := e.dialToCollector()
func (oc *grpcConnection) connect(ctx context.Context) error {
cc, err := oc.dialToCollector(ctx)
if err != nil {
return err
}
return e.enableConnections(cc)
oc.setConnection(cc)
return oc.newConnectionHandler(cc)
}

// setConnection sets cc as the client connection and returns true if
// the connection state changed.
func (oc *grpcConnection) setConnection(cc *grpc.ClientConn) bool {
oc.mu.Lock()
defer oc.mu.Unlock()

// If previous clientConn is same as the current then just return.
// This doesn't happen right now as this func is only called with new ClientConn.
// It is more about future-proofing.
if oc.cc == cc {
return false
}

// If the previous clientConn was non-nil, close it
if oc.cc != nil {
_ = oc.cc.Close()
}
oc.cc = cc
return true
}

func (oc *grpcConnection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) {
addr := oc.c.collectorAddr

dialOpts := []grpc.DialOption{}
if oc.c.grpcServiceConfig != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(oc.c.grpcServiceConfig))
}
if oc.c.clientCredentials != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(oc.c.clientCredentials))
} else if oc.c.canDialInsecure {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
if oc.c.compressor != "" {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(oc.c.compressor)))
}
if len(oc.c.grpcDialOptions) != 0 {
dialOpts = append(dialOpts, oc.c.grpcDialOptions...)
}

ctx, cancel := oc.contextWithStop(ctx)
defer cancel()
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
ctx = oc.contextWithMetadata(ctx)
return grpc.DialContext(ctx, addr, dialOpts...)
}

func (oc *grpcConnection) contextWithMetadata(ctx context.Context) context.Context {
if oc.metadata.Len() > 0 {
return metadata.NewOutgoingContext(ctx, oc.metadata)
}
return ctx
}

func (oc *grpcConnection) shutdown(ctx context.Context) error {
close(oc.stopCh)
// Ensure that the backgroundConnector returns
select {
case <-oc.backgroundConnectionDoneCh:
case <-ctx.Done():
return ctx.Err()
}

close(oc.disconnectedCh)

oc.mu.Lock()
cc := oc.cc
oc.cc = nil
oc.mu.Unlock()

if cc != nil {
return cc.Close()
}

return nil
}

func (oc *grpcConnection) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
// Unify the parent context Done signal with the connection's
// stop channel.
ctx, cancel := context.WithCancel(ctx)
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
// Nothing to do, either cancelled or deadline
// happened.
case <-oc.stopCh:
cancel()
}
}(ctx, cancel)
return ctx, cancel
}
14 changes: 8 additions & 6 deletions exporters/otlp/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
)

func Example_insecure() {
exp, err := otlp.NewExporter(otlp.WithInsecure())
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, otlp.WithInsecure())
if err != nil {
log.Fatalf("Failed to create the collector exporter: %v", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
otel.Handle(err)
Expand All @@ -54,7 +55,7 @@ func Example_insecure() {
tracer := otel.Tracer("test-tracer")

// Then use the OpenTelemetry tracing library, like we normally would.
ctx, span := tracer.Start(context.Background(), "CollectorExporter-Example")
ctx, span := tracer.Start(ctx, "CollectorExporter-Example")
defer span.End()

for i := 0; i < 10; i++ {
Expand All @@ -72,12 +73,13 @@ func Example_withTLS() {
log.Fatalf("failed to create gRPC client TLS credentials: %v", err)
}

exp, err := otlp.NewExporter(otlp.WithTLSCredentials(creds))
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, otlp.WithTLSCredentials(creds))
if err != nil {
log.Fatalf("failed to create the collector exporter: %v", err)
}
defer func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
otel.Handle(err)
Expand All @@ -98,7 +100,7 @@ func Example_withTLS() {
tracer := otel.Tracer("test-tracer")

// Then use the OpenTelemetry tracing library, like we normally would.
ctx, span := tracer.Start(context.Background(), "Securely-Talking-To-Collector-Span")
ctx, span := tracer.Start(ctx, "Securely-Talking-To-Collector-Span")
defer span.End()

for i := 0; i < 10; i++ {
Expand Down
Loading