Skip to content

Commit

Permalink
fix: dynamic flow ref target fixes #195 (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
manikmagar authored Aug 20, 2024
1 parent 679c056 commit 3e1ea77
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.avioconsulting.mule.opentelemetry.api.config.OpenTelemetryResource;
import com.avioconsulting.mule.opentelemetry.api.config.SpanProcessorConfiguration;
import com.avioconsulting.mule.opentelemetry.api.config.exporter.OpenTelemetryExporter;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.mule.runtime.http.api.client.HttpClient;
import org.mule.runtime.http.api.client.HttpClientConfiguration;

Expand Down Expand Up @@ -44,4 +45,8 @@ public HttpClient getHttpClient(String name) {
HttpClientConfiguration clientConfiguration = new HttpClientConfiguration.Builder().setName(name).build();
return getOpenTelemetryConfiguration().getHttpService().getClientFactory().create(clientConfiguration);
}

public ExpressionManager getExpressionManager() {
return openTelemetryConfiguration.getExpressionManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.avioconsulting.mule.opentelemetry.api.config.SpanProcessorConfiguration;
import com.avioconsulting.mule.opentelemetry.api.config.TraceLevelConfiguration;
import com.avioconsulting.mule.opentelemetry.api.providers.OpenTelemetryMetricsConfigProvider;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.mule.runtime.http.api.HttpService;

public interface OpenTelemetryConfiguration {
Expand All @@ -27,4 +28,6 @@ public interface OpenTelemetryConfiguration {
AppIdentifier getAppIdentifier();

OpenTelemetryMetricsConfigProvider getMetricsConfigProvider();

ExpressionManager getExpressionManager();
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public OpenTelemetryExtensionConfiguration setSpanProcessorConfiguration(
return this;
}

public ExpressionManager getExpressionManager() {
return expressionManager;
}

@Override
public String getConfigName() {
return configName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,7 +57,7 @@ public class OpenTelemetryConnection implements TraceContextHandler {
private final Logger logger = LoggerFactory.getLogger(OpenTelemetryConnection.class);
private OpenTelemetryMetricsConfigProvider metricsProvider;
private AppIdentifier appIdentifier;

private ExpressionManager expressionManager;
/**
* Instrumentation version must be picked from the module's artifact version.
* This is a fallback for any dev testing.
Expand Down Expand Up @@ -102,6 +104,7 @@ private OpenTelemetryConnection(OpenTelemetryConfigWrapper openTelemetryConfigWr
turnOffTracing = openTelemetryConfigWrapper.isTurnOffTracing();
appIdentifier = openTelemetryConfigWrapper.getOpenTelemetryConfiguration().getAppIdentifier();
metricsProvider = openTelemetryConfigWrapper.getOpenTelemetryConfiguration().getMetricsConfigProvider();
expressionManager = openTelemetryConfigWrapper.getOpenTelemetryConfiguration().getExpressionManager();
}
builder.setServiceClassLoader(AutoConfiguredOpenTelemetrySdkBuilder.class.getClassLoader());
builder.setResultAsGlobal();
Expand Down Expand Up @@ -147,7 +150,7 @@ public OpenTelemetryMetricsProviderCollection getMetricsProviders() {

/**
* {@link Supplier} to use with
* {@link org.mule.runtime.api.connection.ConnectionProvider} where lazy
* {@link ConnectionProvider} where lazy
* initialization is required.
*
* @return a non-null {@code Supplier<OpenTelemetryConnection>}
Expand Down Expand Up @@ -280,6 +283,10 @@ public Meter get(String instrumentationScopeName) {
return openTelemetry.meterBuilder(instrumentationScopeName).build();
}

public ExpressionManager getExpressionManager() {
return expressionManager;
}

public static enum HashMapTextMapSetter implements TextMapSetter<Map<String, String>> {
INSTANCE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,15 @@ public void before(
getLocationParent(location.getLocation()));
final String transactionId = getEventTransactionId(event);
if (isFlowRef(location)) {
String targetFlowName = traceComponent.getTags().get("mule.app.processor.flowRef.name");
if (muleNotificationProcessor.getOpenTelemetryConnection().getExpressionManager()
.isExpression(targetFlowName)) {
targetFlowName = muleNotificationProcessor.getOpenTelemetryConnection().getExpressionManager()
.evaluate(targetFlowName, event.asBindingContext()).getValue().toString();
traceComponent.getTags().put("mule.app.processor.flowRef.name", targetFlowName);
}
Optional<ComponentLocation> subFlowLocation = findLocation(
traceComponent.getTags().get("mule.app.processor.flowRef.name"),
targetFlowName,
configurationComponentLocator)
.filter(ComponentsUtil::isSubFlow);
if (subFlowLocation.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,15 @@ public void handleProcessorEndEvent(EnrichedServerNotification notification) {
notification.getEvent().getError().orElse(null));

if (isFlowRef(notification.getComponent().getLocation())) {
findLocation(traceComponent.getTags().get("mule.app.processor.flowRef.name"),
String targetFlowName = traceComponent.getTags().get("mule.app.processor.flowRef.name");
if (openTelemetryConnection.getExpressionManager().isExpression(targetFlowName)) {
logger.trace("Resolving expression '{}'", targetFlowName);
targetFlowName = openTelemetryConnection.getExpressionManager()
.evaluate(targetFlowName, notification.getEvent().asBindingContext()).getValue()
.toString();
logger.trace("Resolved to value '{}'", targetFlowName);
}
findLocation(targetFlowName,
configurationComponentLocator)
.filter(ComponentsUtil::isSubFlow)
.ifPresent(subFlowComp -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,43 @@ public void testFlowErrorPropagationSpans() throws Exception {
.containsEntry("error.type", "org.mule.runtime.core.internal.exception.MessagingException");
}

@Test
public void testDynamicFlowRefFlowPropagation() throws Exception {
CoreEvent event = flowRunner("call-dynamic-flow-ref").withVariable("targetFlow", "simple-flow-to-flow").run();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(8));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
SoftAssertions softly = new SoftAssertions();
softly.assertThat(groupedSpans)
.hasEntrySatisfying("call-dynamic-flow-ref", val -> assertThat(val)
.containsOnly(
"logger:ParentFirstLogger", "call-dynamic-flow-ref", "flow-ref:target-flow-call"));
softly.assertThat(groupedSpans).hasEntrySatisfying("flow-ref:target-flow-call",
val -> assertThat(val).containsOnly("simple-flow-to-flow"));
softly.assertThat(groupedSpans).hasEntrySatisfying("simple-flow-to-flow",
val -> assertThat(val).containsOnly("logger:FirstSimpleLogger", "flow-ref:flow-ref"));
softly.assertAll();
}

@Test
public void testDynamicFlowRefSubFlowPropagation() throws Exception {
CoreEvent event = flowRunner("call-dynamic-flow-ref")
.withVariable("targetFlow", "simple-subflow-logger").run();
await().untilAsserted(() -> assertThat(spanQueue)
.hasSize(5));
Map<Object, Set<String>> groupedSpans = groupSpanByParent();
System.out.println(groupedSpans);
SoftAssertions softly = new SoftAssertions();
softly.assertThat(groupedSpans)
.hasEntrySatisfying("call-dynamic-flow-ref", val -> assertThat(val)
.containsOnly(
"logger:ParentFirstLogger", "call-dynamic-flow-ref", "flow-ref:target-flow-call"));
softly.assertThat(groupedSpans).hasEntrySatisfying("flow-ref:target-flow-call",
val -> assertThat(val).containsOnly("simple-subflow-logger"));
softly.assertThat(groupedSpans).hasEntrySatisfying("simple-subflow-logger",
val -> assertThat(val).containsOnly("logger:SimpleLogger"));
softly.assertAll();
}

}
2 changes: 1 addition & 1 deletion src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<Logger name="org.mule.service.http.impl.service.HttpMessageLogger" level="INFO">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="com.avioconsulting.mule.opentelemetry.internal.interceptor" level="INFO">
<Logger name="com.avioconsulting.mule.opentelemetry" level="INFO">
<AppenderRef ref="Console"/>
</Logger>
<Logger name="org.mule.runtime.core" level="WARN">
Expand Down
28 changes: 0 additions & 28 deletions src/test/resources/log4j2.xml

This file was deleted.

4 changes: 4 additions & 0 deletions src/test/resources/mule-core-flows.xml
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,8 @@ http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/htt
</async>
</flow>

<flow name="call-dynamic-flow-ref">
<logger level="INFO" doc:name="ParentFirstLogger" />
<flow-ref name="#[vars.targetFlow]" doc:name="target-flow-call"/>
</flow>
</mule>

0 comments on commit 3e1ea77

Please sign in to comment.