From 38e2d5b2fd4715a9a6928978d12565c151f4c88e Mon Sep 17 00:00:00 2001 From: Manik Magar Date: Tue, 8 Oct 2024 22:36:30 +0530 Subject: [PATCH 1/2] feat: subflow spans with notifications and op (#210) Create subflow spans when processing flow-ref notifications. The current context operation is fixed to use current location so that when context is propagated, it does not always use root context. --- .../api/traces/ComponentEventContext.java | 6 +- .../internal/OpenTelemetryOperations.java | 13 ++-- .../ProcessorTracingInterceptor.java | 29 ++------- .../processor/MuleNotificationProcessor.java | 19 ++++++ .../internal/util/ComponentsUtil.java | 60 +++++++++++++++++ .../AbstractMuleArtifactTraceTest.java | 20 ++++++ .../MuleCoreFlowsNoSpanAllTest.java | 17 ----- .../mule/opentelemetry/MuleCoreFlowsTest.java | 60 ++++++++++------- .../MuleCoreFlowsWithoutInterceptorTest.java | 64 +++++++++++++++++++ src/test/resources/mule-core-flows.xml | 17 +++-- 10 files changed, 232 insertions(+), 73 deletions(-) create mode 100644 src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsWithoutInterceptorTest.java diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/api/traces/ComponentEventContext.java b/src/main/java/com/avioconsulting/mule/opentelemetry/api/traces/ComponentEventContext.java index 2abf75b..2abe33d 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/api/traces/ComponentEventContext.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/api/traces/ComponentEventContext.java @@ -99,7 +99,11 @@ default Optional prevContextScopedPath(String path) { * @return String */ default String contextScopedLocation() { - return getEventContextId() + "/" + getLocation(); + return contextScopedLocationFor(getEventContextId(), getLocation()); + } + + static String contextScopedLocationFor(String eventContextId, String location) { + return eventContextId + "/" + location; } } diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/OpenTelemetryOperations.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/OpenTelemetryOperations.java index 59897dd..297f5ad 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/OpenTelemetryOperations.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/OpenTelemetryOperations.java @@ -1,7 +1,9 @@ package com.avioconsulting.mule.opentelemetry.internal; +import com.avioconsulting.mule.opentelemetry.api.traces.ComponentEventContext; import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnection; import com.avioconsulting.mule.opentelemetry.internal.util.OpenTelemetryUtil; +import org.mule.runtime.api.component.location.ComponentLocation; import org.mule.runtime.extension.api.annotation.Alias; import org.mule.runtime.extension.api.annotation.param.Connection; import org.mule.runtime.extension.api.annotation.param.Optional; @@ -57,13 +59,16 @@ public Map getTraceContext(@Connection Supplier getCurrentTraceContext( @Connection Supplier openTelemetryConnection, - CorrelationInfo correlationInfo) { + CorrelationInfo correlationInfo, ComponentLocation location) { String eventTransactionId = OpenTelemetryUtil.getEventTransactionId(correlationInfo.getEventId()); if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Getting current context for event Id: {}, correlationId: {}, trace transactionId: {}", - correlationInfo.getEventId(), correlationInfo.getCorrelationId(), eventTransactionId); + LOGGER.debug( + "Getting current context for event Id: {}, correlationId: {}, trace transactionId: {} at location {} in container {}", + correlationInfo.getEventId(), correlationInfo.getCorrelationId(), eventTransactionId, + location.getLocation(), location.getRootContainerName()); } - return openTelemetryConnection.get().getTraceContext(eventTransactionId); + return openTelemetryConnection.get().getTraceContext(eventTransactionId, ComponentEventContext + .contextScopedLocationFor(correlationInfo.getEventId(), location.getRootContainerName())); } /** diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/ProcessorTracingInterceptor.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/ProcessorTracingInterceptor.java index 6a22723..26d7998 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/ProcessorTracingInterceptor.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/interceptor/ProcessorTracingInterceptor.java @@ -5,7 +5,6 @@ import com.avioconsulting.mule.opentelemetry.api.traces.TraceComponent; import com.avioconsulting.mule.opentelemetry.internal.processor.MuleNotificationProcessor; import com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil; -import io.opentelemetry.api.trace.SpanKind; import org.mule.runtime.api.component.Component; import org.mule.runtime.api.component.location.ComponentLocation; import org.mule.runtime.api.component.location.ConfigurationComponentLocator; @@ -18,12 +17,10 @@ import org.slf4j.LoggerFactory; import javax.annotation.concurrent.ThreadSafe; -import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_SCOPE_SUBFLOW_NAME; import static com.avioconsulting.mule.opentelemetry.api.store.TransactionStore.*; import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.*; import static com.avioconsulting.mule.opentelemetry.internal.util.OpenTelemetryUtil.getEventTransactionId; @@ -109,29 +106,11 @@ public void before( getLocationParent(location.getLocation())); final String transactionId = getEventTransactionId(event); if (isFlowRef(location)) { - String targetFlowName = traceComponent.getTags().get("mule.app.processor.flowRef.name"); - if (muleNotificationProcessor.getOpenTelemetryConnection().getExpressionManager() - .isExpression(targetFlowName)) { - targetFlowName = muleNotificationProcessor.getOpenTelemetryConnection().getExpressionManager() - .evaluate(targetFlowName, event.asBindingContext()).getValue().toString(); - traceComponent.getTags().put("mule.app.processor.flowRef.name", targetFlowName); - } - Optional subFlowLocation = findLocation( - targetFlowName, - configurationComponentLocator) - .filter(ComponentsUtil::isSubFlow); + Optional subFlowLocation = resolveFlowName( + muleNotificationProcessor.getOpenTelemetryConnection().getExpressionManager(), + traceComponent, event.asBindingContext(), configurationComponentLocator); if (subFlowLocation.isPresent()) { - ComponentLocation subFlowComp = subFlowLocation.get(); - TraceComponent subflowTrace = TraceComponent.of(subFlowComp) - .withTransactionId(traceComponent.getTransactionId()) - .withSpanName(subFlowComp.getLocation()) - .withSpanKind(SpanKind.INTERNAL) - .withTags(Collections.singletonMap(MULE_APP_SCOPE_SUBFLOW_NAME.getKey(), - subFlowComp.getLocation())) - .withStatsCode(traceComponent.getStatusCode()) - .withStartTime(traceComponent.getStartTime()) - .withContext(traceComponent.getContext()) - .withEventContextId(traceComponent.getEventContextId()); + TraceComponent subflowTrace = getTraceComponent(subFlowLocation.get(), traceComponent); muleNotificationProcessor.getOpenTelemetryConnection().addProcessorSpan(subflowTrace, location.getLocation()); event.addVariable(TRACE_CONTEXT_MAP_KEY, diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleNotificationProcessor.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleNotificationProcessor.java index 9b40d15..538177c 100644 --- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleNotificationProcessor.java +++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/processor/MuleNotificationProcessor.java @@ -11,7 +11,9 @@ import com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil; import io.opentelemetry.api.trace.SpanKind; import org.mule.runtime.api.component.ComponentIdentifier; +import org.mule.runtime.api.component.location.ComponentLocation; import org.mule.runtime.api.component.location.ConfigurationComponentLocator; +import org.mule.runtime.api.event.Event; import org.mule.runtime.api.metadata.TypedValue; import org.mule.runtime.api.notification.AsyncMessageNotification; import org.mule.runtime.api.notification.EnrichedServerNotification; @@ -26,11 +28,14 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.function.Supplier; import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_SCOPE_SUBFLOW_NAME; import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.findLocation; +import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.getTraceComponent; import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.isFlowRef; +import static com.avioconsulting.mule.opentelemetry.internal.util.ComponentsUtil.resolveFlowName; /** * Notification Processor bean. This is injected through registry-bootstrap into @@ -164,6 +169,7 @@ private void processComponentStartSpan(EnrichedServerNotification notification) .withComponentLocation(notification.getComponent().getLocation()); openTelemetryConnection.addProcessorSpan(traceComponent, ComponentsUtil.getLocationParent(notification.getComponent().getLocation().getLocation())); + processFlowRef(traceComponent, notification.getEvent()); } } catch (Exception ex) { logger.error("Error in handling processor start event", ex); @@ -171,6 +177,19 @@ private void processComponentStartSpan(EnrichedServerNotification notification) } } + private void processFlowRef(TraceComponent traceComponent, Event event) { + if (isFlowRef(traceComponent.getComponentLocation())) { + Optional subFlowLocation = resolveFlowName( + getOpenTelemetryConnection().getExpressionManager(), traceComponent, event.asBindingContext(), + configurationComponentLocator); + if (subFlowLocation.isPresent()) { + TraceComponent subflowTrace = getTraceComponent(subFlowLocation.get(), traceComponent); + getOpenTelemetryConnection().addProcessorSpan(subflowTrace, + traceComponent.getComponentLocation().getLocation()); + } + } + } + /** *
    * Finds a {@link ProcessorComponent} for {@link org.mule.runtime.api.component.Component} that caused {@link MessageProcessorNotification} event.
diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java
index 8dc78cf..48ed0d5 100644
--- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java
+++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/util/ComponentsUtil.java
@@ -1,19 +1,24 @@
 package com.avioconsulting.mule.opentelemetry.internal.util;
 
 import com.avioconsulting.mule.opentelemetry.api.traces.TraceComponent;
+import io.opentelemetry.api.trace.SpanKind;
 import org.mule.runtime.api.component.Component;
 import org.mule.runtime.api.component.ComponentIdentifier;
 import org.mule.runtime.api.component.TypedComponentIdentifier;
 import org.mule.runtime.api.component.location.ComponentLocation;
 import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
 import org.mule.runtime.api.component.location.LocationPart;
+import org.mule.runtime.api.el.BindingContext;
+import org.mule.runtime.core.api.el.ExpressionManager;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 
 import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_PROCESSOR_NAME;
+import static com.avioconsulting.mule.opentelemetry.api.sdk.SemanticAttributes.MULE_APP_SCOPE_SUBFLOW_NAME;
 import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.FLOW;
 import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.ROUTE;
 import static org.mule.runtime.api.component.TypedComponentIdentifier.ComponentType.SCOPE;
@@ -139,4 +144,59 @@ public static boolean isFlowTypeContainer(ComponentLocation componentLocation) {
   public static boolean isAsyncScope(TypedComponentIdentifier identifier) {
     return SCOPE.equals(identifier.getType()) && identifier.getIdentifier().getName().equals("async");
   }
+
+  /**
+   * Build a Trace component for sub-flow
+   * 
+   * @param subFlowComp
+   * @{@link ComponentLocation} of the target sub-flow
+   * @param traceComponent
+   *            of the flow-ref invoking the sub-flow
+   * @return {@link TraceComponent} for the sub-flow
+   */
+  public static TraceComponent getTraceComponent(ComponentLocation subFlowComp, TraceComponent traceComponent) {
+    return TraceComponent.of(subFlowComp)
+        .withTransactionId(traceComponent.getTransactionId())
+        .withSpanName(subFlowComp.getLocation())
+        .withSpanKind(SpanKind.INTERNAL)
+        .withTags(Collections.singletonMap(MULE_APP_SCOPE_SUBFLOW_NAME.getKey(),
+            subFlowComp.getLocation()))
+        .withStatsCode(traceComponent.getStatusCode())
+        .withStartTime(traceComponent.getStartTime())
+        .withContext(traceComponent.getContext())
+        .withEventContextId(traceComponent.getEventContextId());
+  }
+
+  /**
+   * Resolves the target flow name using given #expressionManager and updates it
+   * in {@link TraceComponent#tags}.
+   * Then it looks up the component location for the resolved flow using given
+   * #configurationComponentLocator.
+   * 
+   * @param expressionManager
+   *            {@link ExpressionManager} to resolve names
+   * @param traceComponent
+   *            {@link TraceComponent} of the flow-ref
+   * @param context
+   *            {@link BindingContext} to use with {@link ExpressionManager}
+   * @param configurationComponentLocator
+   *            {@link ConfigurationComponentLocator} to look up components
+   * @return ComponentLocation of resolved target flow
+   */
+  public static Optional resolveFlowName(ExpressionManager expressionManager,
+      TraceComponent traceComponent, BindingContext context,
+      ConfigurationComponentLocator configurationComponentLocator) {
+    String targetFlowName = traceComponent.getTags().get("mule.app.processor.flowRef.name");
+    if (expressionManager
+        .isExpression(targetFlowName)) {
+      targetFlowName = expressionManager
+          .evaluate(targetFlowName, context).getValue().toString();
+      traceComponent.getTags().put("mule.app.processor.flowRef.name", targetFlowName);
+    }
+    Optional subFlowLocation = findLocation(
+        targetFlowName,
+        configurationComponentLocator)
+            .filter(ComponentsUtil::isSubFlow);
+    return subFlowLocation;
+  }
 }
diff --git a/src/test/java/com/avioconsulting/mule/opentelemetry/AbstractMuleArtifactTraceTest.java b/src/test/java/com/avioconsulting/mule/opentelemetry/AbstractMuleArtifactTraceTest.java
index 3fba44b..6059bbd 100644
--- a/src/test/java/com/avioconsulting/mule/opentelemetry/AbstractMuleArtifactTraceTest.java
+++ b/src/test/java/com/avioconsulting/mule/opentelemetry/AbstractMuleArtifactTraceTest.java
@@ -13,6 +13,7 @@
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.awaitility.Awaitility;
+import org.jetbrains.annotations.NotNull;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -28,11 +29,14 @@
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
+import static com.avioconsulting.mule.opentelemetry.internal.opentelemetry.sdk.test.DelegatedLoggingSpanTestExporter.spanQueue;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -90,6 +94,22 @@ protected void withOtelEndpoint() {
     System.setProperty("otel.exporter.otlp.protocol", "http/protobuf");
   }
 
+  @NotNull
+  protected Map> groupSpanByParent() {
+    // Find the root span
+    DelegatedLoggingSpanTestExporter.Span root = spanQueue.stream()
+        .filter(span -> span.getParentSpanContext().getSpanId().equals("0000000000000000")).findFirst().get();
+
+    // Create a lookup of span id and name
+    Map idNameMap = spanQueue.stream().collect(Collectors.toMap(
+        DelegatedLoggingSpanTestExporter.Span::getSpanId, DelegatedLoggingSpanTestExporter.Span::getSpanName));
+
+    return spanQueue.stream()
+        .collect(Collectors.groupingBy(
+            span -> idNameMap.getOrDefault(span.getParentSpanContext().getSpanId(), root.getSpanName()),
+            Collectors.mapping(DelegatedLoggingSpanTestExporter.Span::getSpanName, Collectors.toSet())));
+  }
+
   /**
    * Gets a {@link Logger} used by
    * `io.opentelemetry.exporter.logging.LoggingSpanExporter`.
diff --git a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java
index 40e9cff..596c37f 100644
--- a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java
+++ b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsNoSpanAllTest.java
@@ -102,21 +102,4 @@ public void testFlowWithGenericSpansOnly() throws Exception {
     System.out.println(groupedSpans);
   }
 
-  @NotNull
-  private Map> groupSpanByParent() {
-    // Find the root span
-    DelegatedLoggingSpanTestExporter.Span root = spanQueue.stream()
-        .filter(span -> span.getParentSpanContext().getSpanId().equals("0000000000000000")).findFirst().get();
-
-    // Create a lookup of span id and name
-    Map idNameMap = spanQueue.stream().collect(Collectors.toMap(
-        DelegatedLoggingSpanTestExporter.Span::getSpanId, DelegatedLoggingSpanTestExporter.Span::getSpanName));
-
-    Map> groupedSpans = spanQueue.stream()
-        .collect(Collectors.groupingBy(
-            span -> idNameMap.getOrDefault(span.getParentSpanContext().getSpanId(), root.getSpanName()),
-            Collectors.mapping(DelegatedLoggingSpanTestExporter.Span::getSpanName, Collectors.toSet())));
-    return groupedSpans;
-  }
-
 }
diff --git a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java
index f34c65a..4262fd7 100644
--- a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java
+++ b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsTest.java
@@ -1,14 +1,11 @@
 package com.avioconsulting.mule.opentelemetry;
 
-import com.avioconsulting.mule.opentelemetry.internal.opentelemetry.sdk.test.DelegatedLoggingSpanTestExporter;
 import org.assertj.core.api.SoftAssertions;
-import org.jetbrains.annotations.NotNull;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.mule.runtime.core.api.event.CoreEvent;
 
 import java.util.*;
-import java.util.stream.Collectors;
 
 import static com.avioconsulting.mule.opentelemetry.internal.opentelemetry.sdk.test.DelegatedLoggingSpanTestExporter.spanQueue;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -159,23 +156,6 @@ public void testFlowControls_ScatterGather() throws Exception {
     softly.assertAll();
   }
 
-  @NotNull
-  private Map> groupSpanByParent() {
-    // Find the root span
-    DelegatedLoggingSpanTestExporter.Span root = spanQueue.stream()
-        .filter(span -> span.getParentSpanContext().getSpanId().equals("0000000000000000")).findFirst().get();
-
-    // Create a lookup of span id and name
-    Map idNameMap = spanQueue.stream().collect(Collectors.toMap(
-        DelegatedLoggingSpanTestExporter.Span::getSpanId, DelegatedLoggingSpanTestExporter.Span::getSpanName));
-
-    Map> groupedSpans = spanQueue.stream()
-        .collect(Collectors.groupingBy(
-            span -> idNameMap.getOrDefault(span.getParentSpanContext().getSpanId(), root.getSpanName()),
-            Collectors.mapping(DelegatedLoggingSpanTestExporter.Span::getSpanName, Collectors.toSet())));
-    return groupedSpans;
-  }
-
   @Test
   @Ignore
   public void testWithCorrelationId() throws Exception {
@@ -293,7 +273,7 @@ public void testDynamicFlowRefSubFlowPropagation() throws Exception {
     CoreEvent event = flowRunner("call-dynamic-flow-ref")
         .withVariable("targetFlow", "simple-subflow-logger").run();
     await().untilAsserted(() -> assertThat(spanQueue)
-        .hasSize(5));
+        .hasSize(6));
     Map> groupedSpans = groupSpanByParent();
     System.out.println(groupedSpans);
     SoftAssertions softly = new SoftAssertions();
@@ -304,7 +284,43 @@ public void testDynamicFlowRefSubFlowPropagation() throws Exception {
     softly.assertThat(groupedSpans).hasEntrySatisfying("flow-ref:target-flow-call",
         val -> assertThat(val).containsOnly("simple-subflow-logger"));
     softly.assertThat(groupedSpans).hasEntrySatisfying("simple-subflow-logger",
-        val -> assertThat(val).containsOnly("logger:SimpleLogger"));
+        val -> assertThat(val).containsOnly(
+            "get-current-trace-context:simple-subflow-logger:Get Current Trace Context",
+            "logger:simple-subflow-logger:SimpleLogger"));
+    softly.assertAll();
+  }
+
+  @Test
+  public void testFlowRefInvocations_withCurrentContextOperations() throws Exception {
+    CoreEvent event = flowRunner("root-flow").run();
+    await().untilAsserted(() -> assertThat(spanQueue)
+        .hasSize(11));
+    Map> groupedSpans = groupSpanByParent();
+    System.out.println(groupedSpans);
+    SoftAssertions softly = new SoftAssertions();
+    softly.assertThat(groupedSpans)
+        .hasEntrySatisfying("root-flow", val -> assertThat(val)
+            .containsOnly(
+                "root-flow", "logger:root-flow:FirstRootLogger",
+                "get-current-trace-context:root-flow:Get Current Trace Context",
+                "flow-ref:root-flow:simple-flow"));
+    softly.assertThat(groupedSpans)
+        .as("Flow-ref to Flow context propagation due to interceptor.")
+        .hasEntrySatisfying("flow-ref:root-flow:simple-flow", val -> assertThat(val)
+            .containsOnly(
+                "simple-flow"));
+    softly.assertThat(groupedSpans).hasEntrySatisfying("simple-flow",
+        val -> assertThat(val).containsOnly("flow-ref:simple-flow:simple-subflow-logger",
+            "get-current-trace-context:simple-flow:Get Current Trace Context",
+            "logger:simple-flow:FirstSimpleLogger"));
+    softly.assertThat(groupedSpans)
+        .as("Sub-flow span created from the flow-ref interception")
+        .hasEntrySatisfying("flow-ref:simple-flow:simple-subflow-logger",
+            val -> assertThat(val).containsOnly("simple-subflow-logger"));
+    softly.assertThat(groupedSpans).hasEntrySatisfying("simple-subflow-logger",
+        val -> assertThat(val).containsOnly(
+            "get-current-trace-context:simple-subflow-logger:Get Current Trace Context",
+            "logger:simple-subflow-logger:SimpleLogger"));
     softly.assertAll();
   }
 
diff --git a/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsWithoutInterceptorTest.java b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsWithoutInterceptorTest.java
new file mode 100644
index 0000000..4682a7d
--- /dev/null
+++ b/src/test/java/com/avioconsulting/mule/opentelemetry/MuleCoreFlowsWithoutInterceptorTest.java
@@ -0,0 +1,64 @@
+package com.avioconsulting.mule.opentelemetry;
+
+import org.assertj.core.api.SoftAssertions;
+import org.junit.Test;
+import org.mule.runtime.core.api.event.CoreEvent;
+
+import java.util.Map;
+import java.util.Set;
+
+import static com.avioconsulting.mule.opentelemetry.internal.interceptor.MessageProcessorTracingInterceptorFactory.MULE_OTEL_INTERCEPTOR_PROCESSOR_ENABLE_PROPERTY_NAME;
+import static com.avioconsulting.mule.opentelemetry.internal.opentelemetry.sdk.test.DelegatedLoggingSpanTestExporter.spanQueue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+public class MuleCoreFlowsWithoutInterceptorTest extends AbstractMuleArtifactTraceTest {
+
+  @Override
+  protected String getConfigFile() {
+    return "mule-core-flows.xml";
+  }
+
+  @Override
+  protected void doSetUpBeforeMuleContextCreation() throws Exception {
+    super.doSetUpBeforeMuleContextCreation();
+    System.setProperty(MULE_OTEL_INTERCEPTOR_PROCESSOR_ENABLE_PROPERTY_NAME,
+        "false");
+  }
+
+  @Override
+  protected void doTearDownAfterMuleContextDispose() throws Exception {
+    super.doTearDownAfterMuleContextDispose();
+    System.clearProperty(MULE_OTEL_INTERCEPTOR_PROCESSOR_ENABLE_PROPERTY_NAME);
+  }
+
+  @Test
+  public void testFlowRefInvocations_withCurrentContextOperations() throws Exception {
+    CoreEvent event = flowRunner("root-flow").run();
+    await().untilAsserted(() -> assertThat(spanQueue)
+        .hasSize(11));
+    Map> groupedSpans = groupSpanByParent();
+    System.out.println(groupedSpans);
+    SoftAssertions softly = new SoftAssertions();
+    softly.assertThat(groupedSpans)
+        .hasEntrySatisfying("root-flow", val -> assertThat(val)
+            .containsOnly(
+                "root-flow", "logger:root-flow:FirstRootLogger",
+                "get-current-trace-context:root-flow:Get Current Trace Context",
+                "flow-ref:root-flow:simple-flow", "simple-flow"));
+    softly.assertThat(groupedSpans).hasEntrySatisfying("simple-flow",
+        val -> assertThat(val).containsOnly("flow-ref:simple-flow:simple-subflow-logger",
+            "get-current-trace-context:simple-flow:Get Current Trace Context",
+            "logger:simple-flow:FirstSimpleLogger"));
+    softly.assertThat(groupedSpans)
+        .as("Sub-flow span created from the flow-ref notification and context")
+        .hasEntrySatisfying("flow-ref:simple-flow:simple-subflow-logger",
+            val -> assertThat(val).containsOnly("simple-subflow-logger"));
+    softly.assertThat(groupedSpans).hasEntrySatisfying("simple-subflow-logger",
+        val -> assertThat(val).containsOnly(
+            "get-current-trace-context:simple-subflow-logger:Get Current Trace Context",
+            "logger:simple-subflow-logger:SimpleLogger"));
+    softly.assertAll();
+  }
+
+}
diff --git a/src/test/resources/mule-core-flows.xml b/src/test/resources/mule-core-flows.xml
index 8ea9258..430d5e5 100644
--- a/src/test/resources/mule-core-flows.xml
+++ b/src/test/resources/mule-core-flows.xml
@@ -29,6 +29,9 @@ http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/htt
 	
 		
 	
+	
+		
+	
 	
 		
 			
@@ -199,13 +202,19 @@ http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/htt
 		
 	
 
-
+	
+		
+		
+		
+	
 	
-		
-		
+		
+		
+		
 	
 	
-		
+		
+		
 	
 	
 		

From e1b78c6230345149f6a0b9d3acaa583a9364b4b4 Mon Sep 17 00:00:00 2001
From: Manik Magar 
Date: Wed, 9 Oct 2024 14:38:51 +0530
Subject: [PATCH 2/2] feat: do not set global otel instance (#211)

* feat: do not set global otel instance

Do not set the static or global instance of opentelemetry. This removes
the usage of Mule SDK Connection providers for operations since
connectivity is driven by the configuration itself.

* fix(test): duplicate logging configuration
---
 pom.xml                                       |  2 +-
 .../internal/OpenTelemetryOperations.java     | 37 ++++++++-------
 .../OpenTelemetryExtensionConfiguration.java  | 10 +++--
 .../connection/OpenTelemetryConnection.java   | 34 ++------------
 .../OpenTelemetryConnectionProvider.java      | 45 -------------------
 src/test/resources/log4j2-test.xml            | 24 +++-------
 6 files changed, 35 insertions(+), 117 deletions(-)
 delete mode 100644 src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnectionProvider.java

diff --git a/pom.xml b/pom.xml
index b85f92d..f67d2ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
     4.0.0
     com.avioconsulting.mule
     mule-opentelemetry-module
-    2.3.1-SNAPSHOT
+    2.4.0-SNAPSHOT
     mule-extension
 
     
diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/OpenTelemetryOperations.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/OpenTelemetryOperations.java
index 297f5ad..b62f379 100644
--- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/OpenTelemetryOperations.java
+++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/OpenTelemetryOperations.java
@@ -1,11 +1,11 @@
 package com.avioconsulting.mule.opentelemetry.internal;
 
 import com.avioconsulting.mule.opentelemetry.api.traces.ComponentEventContext;
-import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnection;
+import com.avioconsulting.mule.opentelemetry.internal.config.OpenTelemetryExtensionConfiguration;
 import com.avioconsulting.mule.opentelemetry.internal.util.OpenTelemetryUtil;
 import org.mule.runtime.api.component.location.ComponentLocation;
 import org.mule.runtime.extension.api.annotation.Alias;
-import org.mule.runtime.extension.api.annotation.param.Connection;
+import org.mule.runtime.extension.api.annotation.param.Config;
 import org.mule.runtime.extension.api.annotation.param.Optional;
 import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
 import org.mule.runtime.extension.api.annotation.param.display.Summary;
@@ -16,7 +16,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
-import java.util.function.Supplier;
 
 public class OpenTelemetryOperations {
 
@@ -26,8 +25,8 @@ public class OpenTelemetryOperations {
    * Deprecated: Use Get Current Trace Context instead. When OTEL_TRACE_CONTEXT
    * does not pre-exist, there is no way for users to get current transaction id.
    *
-   * @param openTelemetryConnection
-   *            {@link OpenTelemetryConnection} Instance
+   * @param config
+   *            {@link OpenTelemetryExtensionConfiguration} Instance
    * @param traceTransactionId
    *            provided by user
    * @param correlationInfo
@@ -37,18 +36,18 @@ public class OpenTelemetryOperations {
   @DisplayName("Get Trace Context")
   @Alias("get-trace-context")
   @Deprecated(message = "Use Get Current Trace Context instead. When OTEL_TRACE_CONTEXT does not pre-exist, there is no way for users to get current transaction id.", since = "2.3.0", toRemoveIn = "3.0.0")
-  public Map getTraceContext(@Connection Supplier openTelemetryConnection,
+  public Map getTraceContext(@Config OpenTelemetryExtensionConfiguration config,
       @DisplayName("Trace Transaction Id") @Optional(defaultValue = "#[vars.OTEL_TRACE_CONTEXT.TRACE_TRANSACTION_ID]") ParameterResolver traceTransactionId,
       CorrelationInfo correlationInfo) {
     LOGGER.warn("get-trace-context has been deprecated. Use get-current-trace-context instead");
-    return openTelemetryConnection.get().getTraceContext(traceTransactionId.resolve());
+    return config.getOpenTelemetryConnection().getTraceContext(traceTransactionId.resolve());
   }
 
   /**
    * Get the trace context for current trace transaction.
    *
-   * @param openTelemetryConnection
-   *            {@link OpenTelemetryConnection} Instance
+   * @param config
+   *            {@link OpenTelemetryExtensionConfiguration} Instance
    * @param correlationInfo
    *            {@link CorrelationInfo} (injected by runtime) to extract the
    *            current event id
@@ -58,7 +57,7 @@ public Map getTraceContext(@Connection Supplier getCurrentTraceContext(
-      @Connection Supplier openTelemetryConnection,
+      @Config OpenTelemetryExtensionConfiguration config,
       CorrelationInfo correlationInfo, ComponentLocation location) {
     String eventTransactionId = OpenTelemetryUtil.getEventTransactionId(correlationInfo.getEventId());
     if (LOGGER.isDebugEnabled()) {
@@ -67,7 +66,7 @@ public Map getCurrentTraceContext(
           correlationInfo.getEventId(), correlationInfo.getCorrelationId(), eventTransactionId,
           location.getLocation(), location.getRootContainerName());
     }
-    return openTelemetryConnection.get().getTraceContext(eventTransactionId, ComponentEventContext
+    return config.getOpenTelemetryConnection().getTraceContext(eventTransactionId, ComponentEventContext
         .contextScopedLocationFor(correlationInfo.getEventId(), location.getRootContainerName()));
   }
 
@@ -75,8 +74,8 @@ public Map getCurrentTraceContext(
    * Deprecated: Use addTransactionTags instead. When OTEL_TRACE_CONTEXT does not
    * pre-exist,there is no way for users to get current transaction id.
    *
-   * @param openTelemetryConnection
-   *            {@link OpenTelemetryConnection} provided by the SDK
+   * @param config
+   *            {@link OpenTelemetryExtensionConfiguration} provided by the SDK
    * @param tags
    *            {@link Map} of {@link String} Keys and {@link String} Values
    *            containing the tags. Behavior of null values in the map is
@@ -86,12 +85,12 @@ public Map getCurrentTraceContext(
    */
   @DisplayName("Add Custom Tags")
   @Deprecated(message = "Use addTransactionTags instead. When OTEL_TRACE_CONTEXT does not pre-exist, there is no way for users to get current transaction id.", since = "2.3.0", toRemoveIn = "3.0.0")
-  public void addCustomTags(@Connection Supplier openTelemetryConnection,
+  public void addCustomTags(@Config OpenTelemetryExtensionConfiguration config,
       @DisplayName("Trace Transaction Id") @Optional(defaultValue = "#[vars.OTEL_TRACE_CONTEXT.TRACE_TRANSACTION_ID]") ParameterResolver traceTransactionId,
       Map tags,
       CorrelationInfo correlationInfo) {
     LOGGER.warn("add-custom-tags has been deprecated. Use add-transaction-tags instead.");
-    openTelemetryConnection.get().getTransactionStore().addTransactionTags(traceTransactionId.resolve(),
+    config.getOpenTelemetryConnection().getTransactionStore().addTransactionTags(traceTransactionId.resolve(),
         "custom",
         tags);
   }
@@ -104,8 +103,8 @@ public void addCustomTags(@Connection Supplier openTele
    * If the transaction's root span previously contained a mapping for the key,
    * the old value is replaced by the new value.
    *
-   * @param openTelemetryConnection
-   *            {@link OpenTelemetryConnection} provided by the SDK
+   * @param config
+   *            {@link OpenTelemetryExtensionConfiguration} provided by the SDK
    * @param tags
    *            {@link Map} of {@link String} Keys and {@link String} Values
    *            containing the tags. Behavior of null values in the map is
@@ -114,7 +113,7 @@ public void addCustomTags(@Connection Supplier openTele
    *            {@link CorrelationInfo} from the runtime
    */
   @DisplayName("Add Transaction Tags")
-  public void addTransactionTags(@Connection Supplier openTelemetryConnection,
+  public void addTransactionTags(@Config OpenTelemetryExtensionConfiguration config,
       Map tags,
       CorrelationInfo correlationInfo) {
     String eventTransactionId = OpenTelemetryUtil.getEventTransactionId(correlationInfo.getEventId());
@@ -122,7 +121,7 @@ public void addTransactionTags(@Connection Supplier ope
       LOGGER.debug("Add Transaction Tags for event Id: {}, correlationId: {}, trace transactionId: {}",
           correlationInfo.getEventId(), correlationInfo.getCorrelationId(), eventTransactionId);
     }
-    openTelemetryConnection.get().getTransactionStore().addTransactionTags(eventTransactionId,
+    config.getOpenTelemetryConnection().getTransactionStore().addTransactionTags(eventTransactionId,
         "custom",
         tags);
   }
diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/config/OpenTelemetryExtensionConfiguration.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/config/OpenTelemetryExtensionConfiguration.java
index 543de0b..8bfc66c 100644
--- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/config/OpenTelemetryExtensionConfiguration.java
+++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/config/OpenTelemetryExtensionConfiguration.java
@@ -11,7 +11,6 @@
 import com.avioconsulting.mule.opentelemetry.api.providers.OpenTelemetryMetricsConfigSupplier;
 import com.avioconsulting.mule.opentelemetry.internal.OpenTelemetryOperations;
 import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnection;
-import com.avioconsulting.mule.opentelemetry.internal.connection.OpenTelemetryConnectionProvider;
 import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.AsyncMessageNotificationListener;
 import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.MetricEventNotificationListener;
 import com.avioconsulting.mule.opentelemetry.internal.notifications.listeners.MuleMessageProcessorNotificationListener;
@@ -27,7 +26,6 @@
 import org.mule.runtime.extension.api.annotation.Configuration;
 import org.mule.runtime.extension.api.annotation.Expression;
 import org.mule.runtime.extension.api.annotation.Operations;
-import org.mule.runtime.extension.api.annotation.connectivity.ConnectionProviders;
 import org.mule.runtime.extension.api.annotation.param.Optional;
 import org.mule.runtime.extension.api.annotation.param.Parameter;
 import org.mule.runtime.extension.api.annotation.param.ParameterGroup;
@@ -42,7 +40,6 @@
 import javax.inject.Inject;
 
 @Operations(OpenTelemetryOperations.class)
-@ConnectionProviders(OpenTelemetryConnectionProvider.class)
 @Configuration
 public class OpenTelemetryExtensionConfiguration
     implements Startable, Stoppable, OpenTelemetryConfiguration, OpenTelemetryMetricsConfigSupplier {
@@ -60,6 +57,7 @@ public class OpenTelemetryExtensionConfiguration
   @Inject
   private ExpressionManager expressionManager;
   private AppIdentifier appIdentifier;
+  private OpenTelemetryConnection openTelemetryConnection;
 
   public HttpService getHttpService() {
     return httpService;
@@ -179,6 +177,10 @@ public ExpressionManager getExpressionManager() {
     return expressionManager;
   }
 
+  public OpenTelemetryConnection getOpenTelemetryConnection() {
+    return openTelemetryConnection;
+  }
+
   @Override
   public String getConfigName() {
     return configName;
@@ -194,7 +196,7 @@ public String getConfigName() {
   public void start() throws MuleException {
     logger.info("Initiating otel config - '{}'", getConfigName());
     appIdentifier = AppIdentifier.fromEnvironment(expressionManager);
-    OpenTelemetryConnection openTelemetryConnection = OpenTelemetryConnection
+    openTelemetryConnection = OpenTelemetryConnection
         .getInstance(new OpenTelemetryConfigWrapper(this));
     muleNotificationProcessor.init(openTelemetryConnection,
         getTraceLevelConfiguration());
diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnection.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnection.java
index 9164340..387e42c 100644
--- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnection.java
+++ b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnection.java
@@ -14,9 +14,7 @@
 import com.avioconsulting.mule.opentelemetry.internal.util.OpenTelemetryUtil;
 import com.avioconsulting.mule.opentelemetry.internal.util.PropertiesUtil;
 import com.avioconsulting.mule.opentelemetry.internal.util.ServiceProviderUtil;
-import io.opentelemetry.api.GlobalOpenTelemetry;
 import io.opentelemetry.api.OpenTelemetry;
-import io.opentelemetry.api.events.GlobalEventEmitterProvider;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.api.trace.*;
 import io.opentelemetry.context.Context;
@@ -72,7 +70,7 @@ public class OpenTelemetryConnection implements TraceContextHandler {
 
   private static final String INSTRUMENTATION_NAME = "mule-opentelemetry-module-DEV";
   private final TransactionStore transactionStore;
-  private static OpenTelemetryConnection openTelemetryConnection;
+  private OpenTelemetryConnection openTelemetryConnection;
   private final OpenTelemetry openTelemetry;
   private final Tracer tracer;
   private boolean turnOffTracing = false;
@@ -110,7 +108,6 @@ private OpenTelemetryConnection(OpenTelemetryConfigWrapper openTelemetryConfigWr
       expressionManager = openTelemetryConfigWrapper.getOpenTelemetryConfiguration().getExpressionManager();
     }
     builder.setServiceClassLoader(AutoConfiguredOpenTelemetrySdkBuilder.class.getClassLoader());
-    builder.setResultAsGlobal();
     if (!turnOffMetrics)
       metricsProvider.initialise(appIdentifier);
     openTelemetry = builder.build().getOpenTelemetrySdk();
@@ -126,6 +123,7 @@ private OpenTelemetryConnection(OpenTelemetryConfigWrapper openTelemetryConfigWr
     tracer = openTelemetry.getTracer(instrumentationName, instrumentationVersion);
     transactionStore = InMemoryTransactionStore.getInstance();
     PropertiesUtil.init();
+    openTelemetryConnection = this;
   }
 
   private void installOpenTelemetryLogger() {
@@ -159,41 +157,17 @@ public OpenTelemetryMetricsProviderCollection getMetricsProviders() {
     return metricsProviders;
   }
 
-  /**
-   * {@link Supplier} to use with
-   * {@link ConnectionProvider} where lazy
-   * initialization is required.
-   * 
-   * @return a non-null {@code Supplier}
-   */
-  public static Supplier supplier() {
-    return () -> openTelemetryConnection;
-  }
-
   /**
    * This is for tests to reset the static instance in-between the tests.
    * Reset Global OpenTelemetry instances.
    */
   public static void resetForTest() {
-    if (openTelemetryConnection != null && openTelemetryConnection.metricsProvider != null) {
-      openTelemetryConnection.metricsProvider.stop();
-      openTelemetryConnection.getMetricsProviders().stop();
-    }
-    GlobalOpenTelemetry.resetForTest();
-    GlobalEventEmitterProvider.resetForTest();
-    openTelemetryConnection = null;
+
   }
 
   public static synchronized OpenTelemetryConnection getInstance(
       OpenTelemetryConfigWrapper openTelemetryConfigWrapper) {
-    if (openTelemetryConnection == null) {
-      openTelemetryConnection = new OpenTelemetryConnection(openTelemetryConfigWrapper);
-    }
-    return openTelemetryConnection;
-  }
-
-  public static synchronized OpenTelemetryConnection getInstance() {
-    return openTelemetryConnection;
+    return new OpenTelemetryConnection(openTelemetryConfigWrapper);
   }
 
   /**
diff --git a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnectionProvider.java b/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnectionProvider.java
deleted file mode 100644
index b658fbc..0000000
--- a/src/main/java/com/avioconsulting/mule/opentelemetry/internal/connection/OpenTelemetryConnectionProvider.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package com.avioconsulting.mule.opentelemetry.internal.connection;
-
-import org.mule.runtime.api.connection.CachedConnectionProvider;
-import org.mule.runtime.api.connection.ConnectionException;
-import org.mule.runtime.api.connection.ConnectionValidationResult;
-import org.mule.runtime.api.notification.NotificationListenerRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.util.function.Supplier;
-
-public class OpenTelemetryConnectionProvider
-    implements CachedConnectionProvider> {
-
-  private final Logger LOGGER = LoggerFactory.getLogger(OpenTelemetryConnectionProvider.class);
-
-  public static final String INSTRUMENTATION_VERSION = "0.0.1";
-  public static final String INSTRUMENTATION_NAME = "com.avioconsulting.mule.tracing";
-
-  @Inject
-  NotificationListenerRegistry notificationListenerRegistry;
-
-  @Override
-  public Supplier connect() throws ConnectionException {
-    return OpenTelemetryConnection.supplier();
-  }
-
-  @Override
-  public void disconnect(Supplier connection) {
-    try {
-      OpenTelemetryConnection openTelemetryConnection = connection.get();
-      if (openTelemetryConnection != null)
-        openTelemetryConnection.invalidate();
-    } catch (Exception e) {
-      LOGGER.error(
-          "Error while disconnecting OpenTelemetry: " + e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public ConnectionValidationResult validate(Supplier connection) {
-    return ConnectionValidationResult.success();
-  }
-}
diff --git a/src/test/resources/log4j2-test.xml b/src/test/resources/log4j2-test.xml
index 9370e6a..a519df4 100644
--- a/src/test/resources/log4j2-test.xml
+++ b/src/test/resources/log4j2-test.xml
@@ -6,24 +6,12 @@
         
     
     
-        
-            
-        
-        
-            
-        
-        
-            
-        
-        
-            
-        
-        
-            
-        
-        
-            
-        
+        
+        
+        
+        
+        
+