From 4f697dc7fe5f81bfdd203b5215f3da281825c046 Mon Sep 17 00:00:00 2001 From: Jay DeLuca Date: Wed, 18 Dec 2024 14:06:24 -0500 Subject: [PATCH] Convert aws sdk 2.2 Sqs Suppress Receive Spans Tests (#12895) --- .../Aws2SqsSuppressReceiveSpansTest.groovy | 27 -- .../v2_2/Aws2SqsSuppressReceiveSpansTest.java | 40 ++ .../Aws2SqsSuppressReceiveSpansTest.groovy | 109 ----- .../v2_2/Aws2SqsDefaultPropagatorTest.java | 2 +- ...ressReceiveSpansDefaultPropagatorTest.java | 48 +++ .../v2_2/Aws2SqsSuppressReceiveSpansTest.java | 54 +++ ...ansW3cPropagatorAndXrayPropagatorTest.java | 25 ++ ...SuppressReceiveSpansW3cPropagatorTest.java | 27 ++ ...SqsW3cPropagatorAndXrayPropagatorTest.java | 2 +- .../awssdk/v2_2/Aws2SqsW3cPropagatorTest.java | 4 +- ...ractAws2SqsSuppressReceiveSpansTest.groovy | 393 ------------------ .../awssdk/v2_2/AbstractAws2SqsBaseTest.java | 275 ++++++++++++ ...stractAws2SqsSuppressReceiveSpansTest.java | 161 +++++++ .../v2_2/AbstractAws2SqsTracingTest.java | 283 +------------ 14 files changed, 654 insertions(+), 796 deletions(-) delete mode 100644 instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/groovy/Aws2SqsSuppressReceiveSpansTest.groovy create mode 100644 instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.java delete mode 100644 instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.groovy create mode 100644 instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansDefaultPropagatorTest.java create mode 100644 instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.java create mode 100644 instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansW3cPropagatorAndXrayPropagatorTest.java create mode 100644 instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansW3cPropagatorTest.java delete mode 100644 instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy create mode 100644 instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsBaseTest.java create mode 100644 instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.java diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/groovy/Aws2SqsSuppressReceiveSpansTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/groovy/Aws2SqsSuppressReceiveSpansTest.groovy deleted file mode 100644 index e7f2128682d6..000000000000 --- a/instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/groovy/Aws2SqsSuppressReceiveSpansTest.groovy +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.awssdk.v2_2.AbstractAws2SqsSuppressReceiveSpansTest -import io.opentelemetry.instrumentation.test.AgentTestTrait -import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration -import software.amazon.awssdk.services.sqs.SqsAsyncClient -import software.amazon.awssdk.services.sqs.SqsClient - -class Aws2SqsSuppressReceiveSpansTest extends AbstractAws2SqsSuppressReceiveSpansTest implements AgentTestTrait { - @Override - ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() { - return ClientOverrideConfiguration.builder() - } - - @Override - SqsClient configureSqsClient(SqsClient sqsClient) { - return sqsClient - } - - @Override - SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) { - return sqsClient - } -} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.java new file mode 100644 index 000000000000..e230874a3e20 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.java @@ -0,0 +1,40 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2; + +import io.opentelemetry.instrumentation.awssdk.v2_2.AbstractAws2SqsSuppressReceiveSpansTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.SqsClient; + +class Aws2SqsSuppressReceiveSpansTest extends AbstractAws2SqsSuppressReceiveSpansTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension getTesting() { + return testing; + } + + @Override + protected SqsClient configureSqsClient(SqsClient sqsClient) { + return sqsClient; + } + + @Override + protected SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) { + return sqsClient; + } + + @Override + protected ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() { + return ClientOverrideConfiguration.builder(); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.groovy deleted file mode 100644 index af05d2ee6d8c..000000000000 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.groovy +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.awssdk.v2_2 - -import io.opentelemetry.instrumentation.test.LibraryTestTrait -import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration -import software.amazon.awssdk.services.sqs.SqsAsyncClient -import software.amazon.awssdk.services.sqs.SqsClient - -abstract class Aws2SqsSuppressReceiveSpansTest extends AbstractAws2SqsSuppressReceiveSpansTest implements LibraryTestTrait { - static AwsSdkTelemetry telemetry - - def setupSpec() { - def telemetryBuilder = AwsSdkTelemetry.builder(getOpenTelemetry()) - .setCaptureExperimentalSpanAttributes(true) - configure(telemetryBuilder) - telemetry = telemetryBuilder.build() - } - - abstract void configure(AwsSdkTelemetryBuilder telemetryBuilder) - - @Override - ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() { - return ClientOverrideConfiguration.builder() - .addExecutionInterceptor( - telemetry.newExecutionInterceptor()) - } - - @Override - SqsClient configureSqsClient(SqsClient sqsClient) { - return telemetry.wrap(sqsClient) - } - - @Override - SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) { - return telemetry.wrap(sqsClient) - } -} - -class Aws2SqsSuppressReceiveSpansDefaultPropagatorTest extends Aws2SqsSuppressReceiveSpansTest { - - @Override - void configure(AwsSdkTelemetryBuilder telemetryBuilder) {} - - @Override - boolean isSqsAttributeInjectionEnabled() { - false - } - - def "duplicate tracing interceptor"() { - setup: - def builder = SqsClient.builder() - configureSdkClient(builder) - def overrideConfiguration = ClientOverrideConfiguration.builder() - .addExecutionInterceptor(telemetry.newExecutionInterceptor()) - .addExecutionInterceptor(telemetry.newExecutionInterceptor()) - .build() - builder.overrideConfiguration(overrideConfiguration) - def client = configureSqsClient(builder.build()) - - client.createQueue(createQueueRequest) - - when: - client.sendMessage(sendMessageRequest) - - def resp = client.receiveMessage(receiveMessageRequest) - - then: - resp.messages().size() == 1 - resp.messages.each {message -> runWithSpan("process child") {}} - assertSqsTraces() - } -} - -class Aws2SqsSuppressReceiveSpansW3CPropagatorTest extends Aws2SqsSuppressReceiveSpansTest { - - @Override - void configure(AwsSdkTelemetryBuilder telemetryBuilder) { - telemetryBuilder.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test - .setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works - } - - @Override - boolean isSqsAttributeInjectionEnabled() { - true - } - - @Override - boolean isXrayInjectionEnabled() { - false - } -} - -/** We want to test the combination of W3C + Xray, as that's what you'll get in prod if you enable W3C. */ -class Aws2SqsSuppressReceiveSpansW3CPropagatorAndXrayPropagatorTest extends Aws2SqsSuppressReceiveSpansTest { - - @Override - void configure(AwsSdkTelemetryBuilder telemetryBuilder) { - telemetryBuilder.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test - } - - @Override - boolean isSqsAttributeInjectionEnabled() { - true - } -} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsDefaultPropagatorTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsDefaultPropagatorTest.java index 9006104b2f7b..ab0d782b85ee 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsDefaultPropagatorTest.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsDefaultPropagatorTest.java @@ -20,7 +20,7 @@ class Aws2SqsDefaultPropagatorTest extends Aws2SqsTracingTest { void configure(AwsSdkTelemetryBuilder telemetryBuilder) {} @Override - boolean isSqsAttributeInjectionEnabled() { + protected boolean isSqsAttributeInjectionEnabled() { return false; } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansDefaultPropagatorTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansDefaultPropagatorTest.java new file mode 100644 index 000000000000..860766e31e41 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansDefaultPropagatorTest.java @@ -0,0 +1,48 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.URISyntaxException; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.SqsClientBuilder; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; + +class Aws2SqsSuppressReceiveSpansDefaultPropagatorTest extends Aws2SqsSuppressReceiveSpansTest { + + @Override + protected void configure(AwsSdkTelemetryBuilder telemetryBuilder) {} + + @Override + protected boolean isSqsAttributeInjectionEnabled() { + return false; + } + + @Test + void testDuplicateTracingInterceptor() throws URISyntaxException { + SqsClientBuilder builder = SqsClient.builder(); + configureSdkClient(builder); + ClientOverrideConfiguration overrideConfiguration = + ClientOverrideConfiguration.builder() + .addExecutionInterceptor(telemetry.newExecutionInterceptor()) + .addExecutionInterceptor(telemetry.newExecutionInterceptor()) + .build(); + builder.overrideConfiguration(overrideConfiguration); + SqsClient client = configureSqsClient(builder.build()); + + client.createQueue(createQueueRequest); + client.sendMessage(sendMessageRequest); + ReceiveMessageResponse response = client.receiveMessage(receiveMessageRequest); + + assertThat(response.messages().size()).isEqualTo(1); + response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {})); + + assertSqsTraces(false, false); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.java new file mode 100644 index 000000000000..6afa0eb26066 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansTest.java @@ -0,0 +1,54 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.SqsClient; + +public abstract class Aws2SqsSuppressReceiveSpansTest + extends AbstractAws2SqsSuppressReceiveSpansTest { + protected AwsSdkTelemetry telemetry; + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension getTesting() { + return testing; + } + + @Override + protected SqsClient configureSqsClient(SqsClient sqsClient) { + return telemetry.wrap(sqsClient); + } + + @Override + protected SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) { + return telemetry.wrap(sqsClient); + } + + @Override + protected ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() { + return ClientOverrideConfiguration.builder() + .addExecutionInterceptor(telemetry.newExecutionInterceptor()); + } + + protected abstract void configure(AwsSdkTelemetryBuilder telemetryBuilder); + + @BeforeEach + void setup() { + AwsSdkTelemetryBuilder telemetryBuilder = + AwsSdkTelemetry.builder(getTesting().getOpenTelemetry()) + .setCaptureExperimentalSpanAttributes(true); + configure(telemetryBuilder); + telemetry = telemetryBuilder.build(); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansW3cPropagatorAndXrayPropagatorTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansW3cPropagatorAndXrayPropagatorTest.java new file mode 100644 index 000000000000..5289e31a5496 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansW3cPropagatorAndXrayPropagatorTest.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +/** + * We want to test the combination of W3C + Xray, as that's what you'll get in prod if you enable + * W3C. + */ +class Aws2SqsSuppressReceiveSpansW3cPropagatorAndXrayPropagatorTest + extends Aws2SqsSuppressReceiveSpansTest { + + @Override + protected void configure(AwsSdkTelemetryBuilder telemetryBuilder) { + telemetryBuilder.setUseConfiguredPropagatorForMessaging( + isSqsAttributeInjectionEnabled()); // Difference to main test + } + + @Override + protected boolean isSqsAttributeInjectionEnabled() { + return true; + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansW3cPropagatorTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansW3cPropagatorTest.java new file mode 100644 index 000000000000..1c479a4ab0e1 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsSuppressReceiveSpansW3cPropagatorTest.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +class Aws2SqsSuppressReceiveSpansW3cPropagatorTest extends Aws2SqsSuppressReceiveSpansTest { + @Override + protected void configure(AwsSdkTelemetryBuilder telemetryBuilder) { + telemetryBuilder + .setUseConfiguredPropagatorForMessaging( + isSqsAttributeInjectionEnabled()) // Difference to main test + .setUseXrayPropagator( + isXrayInjectionEnabled()); // Disable to confirm messaging propagator actually works + } + + @Override + protected boolean isSqsAttributeInjectionEnabled() { + return true; + } + + @Override + protected boolean isXrayInjectionEnabled() { + return false; + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsW3cPropagatorAndXrayPropagatorTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsW3cPropagatorAndXrayPropagatorTest.java index a0d2da870136..7286442e97b2 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsW3cPropagatorAndXrayPropagatorTest.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsW3cPropagatorAndXrayPropagatorTest.java @@ -13,7 +13,7 @@ void configure(AwsSdkTelemetryBuilder telemetryBuilder) { } @Override - boolean isSqsAttributeInjectionEnabled() { + protected boolean isSqsAttributeInjectionEnabled() { return true; } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsW3cPropagatorTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsW3cPropagatorTest.java index 9e8076e5f3a6..3a8636b58fb5 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsW3cPropagatorTest.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/java/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsW3cPropagatorTest.java @@ -17,12 +17,12 @@ void configure(AwsSdkTelemetryBuilder telemetryBuilder) { } @Override - boolean isSqsAttributeInjectionEnabled() { + protected boolean isSqsAttributeInjectionEnabled() { return true; } @Override - boolean isXrayInjectionEnabled() { + protected boolean isXrayInjectionEnabled() { return false; } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy deleted file mode 100644 index a0dd2a8adae0..000000000000 --- a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.groovy +++ /dev/null @@ -1,393 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.awssdk.v2_2 - -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes -import io.opentelemetry.semconv.incubating.AwsIncubatingAttributes -import io.opentelemetry.semconv.ServerAttributes -import io.opentelemetry.semconv.HttpAttributes -import io.opentelemetry.semconv.UrlAttributes -import org.elasticmq.rest.sqs.SQSRestServerBuilder -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider -import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration -import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.services.sqs.SqsAsyncClient -import software.amazon.awssdk.services.sqs.SqsBaseClientBuilder -import software.amazon.awssdk.services.sqs.SqsClient -import software.amazon.awssdk.services.sqs.model.CreateQueueRequest -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue -import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest -import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest -import software.amazon.awssdk.services.sqs.model.SendMessageRequest -import spock.lang.Shared - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSpecification { - - private static final StaticCredentialsProvider CREDENTIALS_PROVIDER = StaticCredentialsProvider - .create(AwsBasicCredentials.create("my-access-key", "my-secret-key")) - - @Shared - def sqs - - @Shared - int sqsPort - - static Map dummyMessageAttributes(count) { - (0.. e.messageBody("e1").id("i1"), - // 8 attributes, injection always possible - e -> e.messageBody("e2").id("i2") - .messageAttributes(dummyMessageAttributes(8)), - // 10 attributes, injection with custom propagator never possible - e -> e.messageBody("e3").id("i3").messageAttributes(dummyMessageAttributes(10))) - .build() - - boolean isSqsAttributeInjectionEnabled() { - AbstractAws2ClientCoreTest.isSqsAttributeInjectionEnabled() - } - - boolean isXrayInjectionEnabled() { - true - } - - void configureSdkClient(SqsBaseClientBuilder builder) { - builder - .overrideConfiguration(createOverrideConfigurationBuilder().build()) - .endpointOverride(new URI("http://localhost:" + sqsPort)) - builder - .region(Region.AP_NORTHEAST_1) - .credentialsProvider(CREDENTIALS_PROVIDER) - } - - abstract SqsClient configureSqsClient(SqsClient sqsClient) - - abstract SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) - - abstract ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() - - def setupSpec() { - sqs = SQSRestServerBuilder.withPort(0).withInterface("localhost").start() - def server = sqs.waitUntilStarted() - sqsPort = server.localAddress().port - println getClass().name + " SQS server started at: localhost:$sqsPort/" - } - - def cleanupSpec() { - if (sqs != null) { - sqs.stopAndWait() - } - } - - void assertSqsTraces(withParent = false) { - assertTraces(2 + (withParent ? 1 : 0)) { - trace(0, 1) { - - span(0) { - name "Sqs.CreateQueue" - kind CLIENT - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.queue.name" "testSdkSqs" - "$AwsIncubatingAttributes.AWS_REQUEST_ID" { it == "00000000-0000-0000-0000-000000000000" || it == "UNKNOWN" } - "rpc.system" "aws-api" - "rpc.service" "Sqs" - "rpc.method" "CreateQueue" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") } - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" sqsPort - } - } - } - trace(1, 3) { - span(0) { - name "testSdkSqs publish" - kind PRODUCER - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" - "$AwsIncubatingAttributes.AWS_REQUEST_ID" { it == "00000000-0000-0000-0000-000000000000" || it == "UNKNOWN" } - "rpc.system" "aws-api" - "rpc.method" "SendMessage" - "rpc.service" "Sqs" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") } - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" sqsPort - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String - } - } - span(1) { - name "testSdkSqs process" - kind CONSUMER - childOf span(0) - hasNoLinks() - attributes { - "aws.agent" "java-aws-sdk" - "rpc.method" "ReceiveMessage" - "rpc.system" "aws-api" - "rpc.service" "Sqs" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") } - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" sqsPort - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String - } - } - span(2) { - name "process child" - childOf span(1) - attributes { - } - } - } - if (withParent) { - /** - * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). - * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear - */ - trace(2, 2) { - span(0) { - name "parent" - hasNoParent() - } - span(1) { - name "Sqs.ReceiveMessage" - kind CLIENT - childOf span(0) - hasNoLinks() - attributes { - "aws.agent" "java-aws-sdk" - "$AwsIncubatingAttributes.AWS_REQUEST_ID" { it == "00000000-0000-0000-0000-000000000000" || it == "UNKNOWN" } - "rpc.method" "ReceiveMessage" - "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" - "rpc.system" "aws-api" - "rpc.service" "Sqs" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") } - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" sqsPort - } - } - } - } - } - } - - def "simple sqs producer-consumer services: sync"() { - setup: - def builder = SqsClient.builder() - configureSdkClient(builder) - def client = configureSqsClient(builder.build()) - - client.createQueue(createQueueRequest) - - when: - client.sendMessage(sendMessageRequest) - - def resp = client.receiveMessage(receiveMessageRequest) - - then: - resp.messages.size() == 1 - resp.messages.each {message -> runWithSpan("process child") {}} - assertSqsTraces() - } - - def "simple sqs producer-consumer services with parent: sync"() { - setup: - def builder = SqsClient.builder() - configureSdkClient(builder) - def client = configureSqsClient(builder.build()) - - client.createQueue(createQueueRequest) - - when: - client.sendMessage(sendMessageRequest) - - def resp = runWithSpan("parent") { - client.receiveMessage(receiveMessageRequest) - } - - then: - resp.messages.size() == 1 - resp.messages.each {message -> runWithSpan("process child") {}} - assertSqsTraces(true) - } - - def "simple sqs producer-consumer services: async"() { - setup: - def builder = SqsAsyncClient.builder() - configureSdkClient(builder) - def client = configureSqsClient(builder.build()) - - client.createQueue(createQueueRequest).get() - - when: - client.sendMessage(sendMessageRequest).get() - - def resp = client.receiveMessage(receiveMessageRequest).get() - - then: - resp.messages.size() == 1 - resp.messages.each {message -> runWithSpan("process child") {}} - assertSqsTraces() - } - - def "batch sqs producer-consumer services: sync"() { - setup: - def builder = SqsClient.builder() - configureSdkClient(builder) - def client = configureSqsClient(builder.build()) - - client.createQueue(createQueueRequest) - - when: - client.sendMessageBatch(sendMessageBatchRequest) - - def resp = client.receiveMessage(receiveMessageBatchRequest) - def totalAttrs = resp.messages().sum {it.messageAttributes().size() } - - then: - resp.messages().size() == 3 - - // +2: 3 messages, 2x traceparent, 1x not injected due to too many attrs - totalAttrs == 18 + (sqsAttributeInjectionEnabled ? 2 : 0) - - assertTraces(xrayInjectionEnabled ? 2 : 3) { - trace(0, 1) { - - span(0) { - name "Sqs.CreateQueue" - kind CLIENT - } - } - trace(1, xrayInjectionEnabled ? 4 : 3) { - span(0) { - name "testSdkSqs publish" - kind PRODUCER - hasNoParent() - attributes { - "aws.agent" "java-aws-sdk" - "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" - "$AwsIncubatingAttributes.AWS_REQUEST_ID" { it.trim() == "00000000-0000-0000-0000-000000000000" || it == "UNKNOWN" } - "rpc.system" "aws-api" - "rpc.method" "SendMessageBatch" - "rpc.service" "Sqs" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") } - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" sqsPort - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" - } - } - for (int i: 1..(xrayInjectionEnabled ? 3 : 2)) { - span(i) { - name "testSdkSqs process" - kind CONSUMER - childOf span(0) - hasNoLinks() - - attributes { - "aws.agent" "java-aws-sdk" - "rpc.method" "ReceiveMessage" - "rpc.system" "aws-api" - "rpc.service" "Sqs" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") } - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" sqsPort - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String - } - } - } - } - if (!xrayInjectionEnabled) { - trace(2, 1) { - span(0) { - name "testSdkSqs process" - kind CONSUMER - - // TODO This is not nice at all, and can also happen if producer is not instrumented - hasNoParent() - hasNoLinks() - - attributes { - "aws.agent" "java-aws-sdk" - "rpc.method" "ReceiveMessage" - "rpc.system" "aws-api" - "rpc.service" "Sqs" - "$HttpAttributes.HTTP_REQUEST_METHOD" "POST" - "$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200 - "$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") } - "$ServerAttributes.SERVER_ADDRESS" "localhost" - "$ServerAttributes.SERVER_PORT" sqsPort - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String - } - } - } - } - } - } -} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsBaseTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsBaseTest.java new file mode 100644 index 000000000000..902bfdc0d4ec --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsBaseTest.java @@ -0,0 +1,275 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.UrlAttributes.URL_FULL; +import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import org.apache.pekko.http.scaladsl.Http; +import org.elasticmq.rest.sqs.SQSRestServer; +import org.elasticmq.rest.sqs.SQSRestServerBuilder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.SqsClientBuilder; +import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; + +public abstract class AbstractAws2SqsBaseTest { + protected static final StaticCredentialsProvider CREDENTIALS_PROVIDER = + StaticCredentialsProvider.create( + AwsBasicCredentials.create("my-access-key", "my-secret-key")); + protected static int sqsPort; + protected static SQSRestServer sqs; + protected final String queueUrl = "http://localhost:" + sqsPort + "/000000000000/testSdkSqs"; + + protected ReceiveMessageRequest receiveMessageRequest = + ReceiveMessageRequest.builder().queueUrl(queueUrl).build(); + + protected ReceiveMessageRequest receiveMessageBatchRequest = + ReceiveMessageRequest.builder() + .queueUrl(queueUrl) + .maxNumberOfMessages(3) + .messageAttributeNames("All") + .waitTimeSeconds(5) + .build(); + + protected CreateQueueRequest createQueueRequest = + CreateQueueRequest.builder().queueName("testSdkSqs").build(); + + protected SendMessageRequest sendMessageRequest = + SendMessageRequest.builder().queueUrl(queueUrl).messageBody("{\"type\": \"hello\"}").build(); + + @SuppressWarnings("unchecked") + protected SendMessageBatchRequest sendMessageBatchRequest = + SendMessageBatchRequest.builder() + .queueUrl(queueUrl) + .entries( + e -> e.messageBody("e1").id("i1"), + // 8 attributes, injection always possible + e -> e.messageBody("e2").id("i2").messageAttributes(dummyMessageAttributes(8)), + // 10 attributes, injection with custom propagator never possible + e -> e.messageBody("e3").id("i3").messageAttributes(dummyMessageAttributes(10))) + .build(); + + protected abstract InstrumentationExtension getTesting(); + + protected abstract SqsClient configureSqsClient(SqsClient sqsClient); + + protected abstract SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient); + + protected abstract ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder(); + + protected abstract void assertSqsTraces(boolean withParent, boolean captureHeaders); + + static Map dummyMessageAttributes(int count) { + Map map = new HashMap<>(); + for (int i = 0; i < count; i++) { + map.put( + "a" + i, MessageAttributeValue.builder().stringValue("v" + i).dataType("String").build()); + } + return map; + } + + protected boolean isXrayInjectionEnabled() { + return true; + } + + protected void configureSdkClient(SqsClientBuilder builder) throws URISyntaxException { + builder + .overrideConfiguration(createOverrideConfigurationBuilder().build()) + .endpointOverride(new URI("http://localhost:" + sqsPort)); + builder.region(Region.AP_NORTHEAST_1).credentialsProvider(CREDENTIALS_PROVIDER); + } + + protected void configureSdkClient(SqsAsyncClientBuilder builder) throws URISyntaxException { + builder + .overrideConfiguration(createOverrideConfigurationBuilder().build()) + .endpointOverride(new URI("http://localhost:" + sqsPort)); + builder.region(Region.AP_NORTHEAST_1).credentialsProvider(CREDENTIALS_PROVIDER); + } + + protected boolean isSqsAttributeInjectionEnabled() { + // See io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.TracingExecutionInterceptor + return ConfigPropertiesUtil.getBoolean( + "otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false); + } + + @BeforeAll + static void setUp() { + sqs = SQSRestServerBuilder.withPort(0).withInterface("localhost").start(); + Http.ServerBinding server = sqs.waitUntilStarted(); + sqsPort = server.localAddress().getPort(); + } + + @AfterAll + static void cleanUp() { + if (sqs != null) { + sqs.stopAndWait(); + } + } + + @Test + void testSimpleSqsProducerConsumerServicesSync() throws URISyntaxException { + SqsClientBuilder builder = SqsClient.builder(); + configureSdkClient(builder); + SqsClient client = configureSqsClient(builder.build()); + + client.createQueue(createQueueRequest); + client.sendMessage(sendMessageRequest); + + ReceiveMessageResponse response = client.receiveMessage(receiveMessageRequest); + + assertThat(response.messages().size()).isEqualTo(1); + + response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {})); + assertSqsTraces(false, false); + } + + @Test + void testSimpleSqsProducerConsumerServicesWithParentSync() throws URISyntaxException { + SqsClientBuilder builder = SqsClient.builder(); + configureSdkClient(builder); + SqsClient client = configureSqsClient(builder.build()); + + client.createQueue(createQueueRequest); + client.sendMessage(sendMessageRequest); + + ReceiveMessageResponse response = + getTesting().runWithSpan("parent", () -> client.receiveMessage(receiveMessageRequest)); + + assertThat(response.messages().size()).isEqualTo(1); + + response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {})); + assertSqsTraces(true, false); + } + + @SuppressWarnings("InterruptedExceptionSwallowed") + @Test + void testSimpleSqsProducerConsumerServicesAsync() throws Exception { + SqsAsyncClientBuilder builder = SqsAsyncClient.builder(); + configureSdkClient(builder); + SqsAsyncClient client = configureSqsClient(builder.build()); + + client.createQueue(createQueueRequest).get(); + client.sendMessage(sendMessageRequest).get(); + + ReceiveMessageResponse response = client.receiveMessage(receiveMessageRequest).get(); + + assertThat(response.messages().size()).isEqualTo(1); + + response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {})); + assertSqsTraces(false, false); + } + + static SpanDataAssert createQueueSpan(SpanDataAssert span) { + return span.hasName("Sqs.CreateQueue") + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + equalTo(stringKey("aws.queue.name"), "testSdkSqs"), + satisfies( + AWS_REQUEST_ID, + val -> val.matches("\\s*00000000-0000-0000-0000-000000000000\\s*|UNKNOWN")), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(RPC_METHOD, "CreateQueue"), + equalTo(HTTP_REQUEST_METHOD, "POST"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, sqsPort)); + } + + @SuppressWarnings("deprecation") // using deprecated semconv + static SpanDataAssert processSpan(SpanDataAssert span, SpanData parent) { + return span.hasName("testSdkSqs process") + .hasKind(SpanKind.CONSUMER) + .hasParent(parent) + .hasTotalRecordedLinks(0) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(RPC_METHOD, "ReceiveMessage"), + equalTo(HTTP_REQUEST_METHOD, "POST"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, sqsPort), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"), + equalTo(MESSAGING_OPERATION, "process"), + satisfies(MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class))); + } + + @SuppressWarnings("deprecation") // using deprecated semconv + static SpanDataAssert publishSpan(SpanDataAssert span, String queueUrl, String rcpMethod) { + return span.hasName("testSdkSqs publish") + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + equalTo(stringKey("aws.queue.url"), queueUrl), + satisfies( + AWS_REQUEST_ID, + val -> val.matches("\\s*00000000-0000-0000-0000-000000000000\\s*|UNKNOWN")), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(RPC_METHOD, rcpMethod), + equalTo(HTTP_REQUEST_METHOD, "POST"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, sqsPort), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"), + equalTo(MESSAGING_OPERATION, "publish"), + satisfies( + MESSAGING_MESSAGE_ID, + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isInstanceOf(String.class), + v -> assertThat(v).isNull()))); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.java new file mode 100644 index 000000000000..4d0a9be89c3f --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsSuppressReceiveSpansTest.java @@ -0,0 +1,161 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v2_2; + +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.UrlAttributes.URL_FULL; +import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; +import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; +import io.opentelemetry.sdk.testing.assertj.TraceAssert; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.SqsClientBuilder; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; + +public abstract class AbstractAws2SqsSuppressReceiveSpansTest extends AbstractAws2SqsBaseTest { + + @Override + protected void assertSqsTraces(boolean withParent, boolean captureHeaders) { + List> traceAsserts = + new ArrayList<>( + Arrays.asList( + trace -> trace.hasSpansSatisfyingExactly(span -> createQueueSpan(span)), + trace -> + trace.hasSpansSatisfyingExactly( + span -> publishSpan(span, queueUrl, "SendMessage"), + span -> processSpan(span, trace.getSpan(0)), + span -> + span.hasName("process child") + .hasParent(trace.getSpan(1)) + .hasAttributes(Attributes.empty())))); + + if (withParent) { + /* + * This span represents HTTP "sending of receive message" operation. It's always single, + * while there can be multiple CONSUMER spans (one per consumed message). + * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then + * HTTP instrumentation span would appear) + */ + traceAsserts.add( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent(), + span -> + span.hasName("Sqs.ReceiveMessage") + .hasKind(SpanKind.CLIENT) + .hasTotalRecordedLinks(0) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + equalTo(stringKey("aws.queue.url"), queueUrl), + satisfies( + AWS_REQUEST_ID, + val -> + val.matches( + "\\s*00000000-0000-0000-0000-000000000000\\s*|UNKNOWN")), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(RPC_METHOD, "ReceiveMessage"), + equalTo(HTTP_REQUEST_METHOD, "POST"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, sqsPort)))); + } + + getTesting().waitAndAssertTraces(traceAsserts); + } + + @Test + @SuppressWarnings("deprecation") // using deprecated semconv + void testBatchSqsProducerConsumerServicesSync() throws URISyntaxException { + SqsClientBuilder builder = SqsClient.builder(); + configureSdkClient(builder); + SqsClient client = configureSqsClient(builder.build()); + + client.createQueue(createQueueRequest); + client.sendMessageBatch(sendMessageBatchRequest); + + ReceiveMessageResponse response = client.receiveMessage(receiveMessageBatchRequest); + + int totalAttrs = + response.messages().stream().mapToInt(message -> message.messageAttributes().size()).sum(); + + // generates the process spans + response.messages().forEach(message -> {}); + + assertThat(response.messages().size()).isEqualTo(3); + + // +2: 3 messages, 2x traceparent, 1x not injected due to too many attrs + assertThat(totalAttrs).isEqualTo(18 + (isSqsAttributeInjectionEnabled() ? 2 : 0)); + + List> traceAsserts = + new ArrayList<>( + Arrays.asList( + trace -> trace.hasSpansSatisfyingExactly(span -> createQueueSpan(span)), + trace -> { + List> spanAsserts = + new ArrayList<>( + singletonList(span -> publishSpan(span, queueUrl, "SendMessageBatch"))); + + for (int i = 0; i <= (isXrayInjectionEnabled() ? 2 : 1); i++) { + spanAsserts.add(span -> processSpan(span, trace.getSpan(0))); + } + trace.hasSpansSatisfyingExactly(spanAsserts); + })); + + if (!isXrayInjectionEnabled()) { + traceAsserts.add( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("testSdkSqs process") + .hasKind(SpanKind.CONSUMER) + // TODO: This is not good, and can also happen if producer is not + // instrumented + .hasNoParent() + .hasTotalRecordedLinks(0) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("aws.agent"), "java-aws-sdk"), + equalTo(RPC_SYSTEM, "aws-api"), + equalTo(RPC_SERVICE, "Sqs"), + equalTo(RPC_METHOD, "ReceiveMessage"), + equalTo(HTTP_REQUEST_METHOD, "POST"), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200), + satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), + equalTo(SERVER_ADDRESS, "localhost"), + equalTo(SERVER_PORT, sqsPort), + equalTo(MESSAGING_SYSTEM, AWS_SQS), + equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"), + equalTo(MESSAGING_OPERATION, "process"), + satisfies(MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class))))); + } + getTesting().waitAndAssertTraces(traceAsserts); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.java b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.java index e8085e14eccf..6fa897d46292 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.java @@ -14,186 +14,51 @@ import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; import static io.opentelemetry.semconv.UrlAttributes.URL_FULL; +import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS; import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD; import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE; import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM; +import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil; -import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.semconv.incubating.AwsIncubatingAttributes; -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.collect.ImmutableList; -import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import org.apache.pekko.http.scaladsl.Http; -import org.elasticmq.rest.sqs.SQSRestServer; -import org.elasticmq.rest.sqs.SQSRestServerBuilder; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.SqsClientBuilder; -import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; -import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; @SuppressWarnings("deprecation") // using deprecated semconv -public abstract class AbstractAws2SqsTracingTest { +public abstract class AbstractAws2SqsTracingTest extends AbstractAws2SqsBaseTest { - protected abstract InstrumentationExtension getTesting(); - - protected abstract SqsClient configureSqsClient(SqsClient sqsClient); - - protected abstract SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient); - - protected abstract ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder(); - - private static final StaticCredentialsProvider CREDENTIALS_PROVIDER = - StaticCredentialsProvider.create( - AwsBasicCredentials.create("my-access-key", "my-secret-key")); - - private static int sqsPort; - private static SQSRestServer sqs; - - static Map dummyMessageAttributes(int count) { - Map map = new HashMap<>(); - for (int i = 0; i < count; i++) { - map.put( - "a" + i, MessageAttributeValue.builder().stringValue("v" + i).dataType("String").build()); - } - return map; - } - - private final String queueUrl = "http://localhost:" + sqsPort + "/000000000000/testSdkSqs"; - - ReceiveMessageRequest receiveMessageRequest = - ReceiveMessageRequest.builder().queueUrl(queueUrl).build(); - - ReceiveMessageRequest receiveMessageBatchRequest = - ReceiveMessageRequest.builder() - .queueUrl(queueUrl) - .maxNumberOfMessages(3) - .messageAttributeNames("All") - .waitTimeSeconds(5) - .build(); - - CreateQueueRequest createQueueRequest = - CreateQueueRequest.builder().queueName("testSdkSqs").build(); - - SendMessageRequest sendMessageRequest = - SendMessageRequest.builder().queueUrl(queueUrl).messageBody("{\"type\": \"hello\"}").build(); - - @SuppressWarnings("unchecked") - SendMessageBatchRequest sendMessageBatchRequest = - SendMessageBatchRequest.builder() - .queueUrl(queueUrl) - .entries( - e -> e.messageBody("e1").id("i1"), - // 8 attributes, injection always possible - e -> e.messageBody("e2").id("i2").messageAttributes(dummyMessageAttributes(8)), - // 10 attributes, injection with custom propagator never possible - e -> e.messageBody("e3").id("i3").messageAttributes(dummyMessageAttributes(10))) - .build(); - - boolean isSqsAttributeInjectionEnabled() { - // See io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.TracingExecutionInterceptor - return ConfigPropertiesUtil.getBoolean( - "otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false); - } - - boolean isXrayInjectionEnabled() { - return true; - } - - void configureSdkClient(SqsClientBuilder builder) throws URISyntaxException { - builder - .overrideConfiguration(createOverrideConfigurationBuilder().build()) - .endpointOverride(new URI("http://localhost:" + sqsPort)); - builder.region(Region.AP_NORTHEAST_1).credentialsProvider(CREDENTIALS_PROVIDER); - } - - void configureSdkClient(SqsAsyncClientBuilder builder) throws URISyntaxException { - builder - .overrideConfiguration(createOverrideConfigurationBuilder().build()) - .endpointOverride(new URI("http://localhost:" + sqsPort)); - builder.region(Region.AP_NORTHEAST_1).credentialsProvider(CREDENTIALS_PROVIDER); - } - - @BeforeAll - static void setUp() { - sqs = SQSRestServerBuilder.withPort(0).withInterface("localhost").start(); - Http.ServerBinding server = sqs.waitUntilStarted(); - sqsPort = server.localAddress().getPort(); - } - - @AfterAll - static void cleanUp() { - if (sqs != null) { - sqs.stopAndWait(); - } - } - - void assertSqsTraces(Boolean withParent, Boolean captureHeaders) { + @Override + protected void assertSqsTraces(boolean withParent, boolean captureHeaders) { int offset = withParent ? 2 : 0; AtomicReference publishSpan = new AtomicReference<>(); getTesting() .waitAndAssertTraces( - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName("Sqs.CreateQueue") - .hasKind(SpanKind.CLIENT) - .hasNoParent() - .hasAttributesSatisfyingExactly( - equalTo(stringKey("aws.agent"), "java-aws-sdk"), - equalTo(stringKey("aws.queue.name"), "testSdkSqs"), - satisfies( - AwsIncubatingAttributes.AWS_REQUEST_ID, - val -> - val.satisfiesAnyOf( - v -> - assertThat(v) - .isEqualTo( - "00000000-0000-0000-0000-000000000000"), - v -> assertThat(v).isEqualTo("UNKNOWN"))), - equalTo(RPC_SYSTEM, "aws-api"), - equalTo(RPC_SERVICE, "Sqs"), - equalTo(RPC_METHOD, "CreateQueue"), - equalTo(HTTP_REQUEST_METHOD, "POST"), - equalTo(HTTP_RESPONSE_STATUS_CODE, 200), - satisfies( - URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), - equalTo(SERVER_ADDRESS, "localhost"), - equalTo(SERVER_PORT, sqsPort))), + trace -> trace.hasSpansSatisfyingExactly(span -> createQueueSpan(span)), trace -> trace.hasSpansSatisfyingExactly( span -> { @@ -206,14 +71,10 @@ void assertSqsTraces(Boolean withParent, Boolean captureHeaders) { stringKey("aws.queue.url"), "http://localhost:" + sqsPort + "/000000000000/testSdkSqs"), satisfies( - AwsIncubatingAttributes.AWS_REQUEST_ID, + AWS_REQUEST_ID, val -> - val.satisfiesAnyOf( - v -> - assertThat(v) - .isEqualTo( - "00000000-0000-0000-0000-000000000000"), - v -> assertThat(v).isEqualTo("UNKNOWN"))), + val.matches( + "\\s*00000000-0000-0000-0000-000000000000\\s*|UNKNOWN")), equalTo(RPC_SYSTEM, "aws-api"), equalTo(RPC_SERVICE, "Sqs"), equalTo(RPC_METHOD, "SendMessage"), @@ -223,10 +84,7 @@ void assertSqsTraces(Boolean withParent, Boolean captureHeaders) { URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, sqsPort), - equalTo( - MESSAGING_SYSTEM, - MessagingIncubatingAttributes.MessagingSystemIncubatingValues - .AWS_SQS), + equalTo(MESSAGING_SYSTEM, AWS_SQS), equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"), equalTo(MESSAGING_OPERATION, "publish"), satisfies( @@ -266,14 +124,10 @@ void assertSqsTraces(Boolean withParent, Boolean captureHeaders) { stringKey("aws.queue.url"), "http://localhost:" + sqsPort + "/000000000000/testSdkSqs"), satisfies( - AwsIncubatingAttributes.AWS_REQUEST_ID, + AWS_REQUEST_ID, val -> - val.satisfiesAnyOf( - v -> - assertThat(v) - .isEqualTo( - "00000000-0000-0000-0000-000000000000"), - v -> assertThat(v).isEqualTo("UNKNOWN"))), + val.matches( + "\\s*00000000-0000-0000-0000-000000000000\\s*|UNKNOWN")), equalTo(RPC_SYSTEM, "aws-api"), equalTo(RPC_SERVICE, "Sqs"), equalTo(RPC_METHOD, "ReceiveMessage"), @@ -301,10 +155,7 @@ void assertSqsTraces(Boolean withParent, Boolean captureHeaders) { URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, sqsPort), - equalTo( - MESSAGING_SYSTEM, - MessagingIncubatingAttributes - .MessagingSystemIncubatingValues.AWS_SQS), + equalTo(MESSAGING_SYSTEM, AWS_SQS), equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"), equalTo(MESSAGING_OPERATION, "receive"), equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1))); @@ -341,10 +192,7 @@ void assertSqsTraces(Boolean withParent, Boolean captureHeaders) { URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, sqsPort), - equalTo( - MESSAGING_SYSTEM, - MessagingIncubatingAttributes - .MessagingSystemIncubatingValues.AWS_SQS), + equalTo(MESSAGING_SYSTEM, AWS_SQS), equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"), equalTo(MESSAGING_OPERATION, "process"), satisfies( @@ -354,7 +202,7 @@ void assertSqsTraces(Boolean withParent, Boolean captureHeaders) { attributes.add( satisfies( stringArrayKey("messaging.header.test_message_header"), - v -> v.isEqualTo(ImmutableList.of("test")))); + v -> v.isEqualTo(singletonList("test")))); } span.hasName("testSdkSqs process") @@ -378,24 +226,6 @@ void assertSqsTraces(Boolean withParent, Boolean captureHeaders) { }); } - @Test - void testSimpleSqsProducerConsumerServicesSync() throws URISyntaxException { - SqsClientBuilder builder = SqsClient.builder(); - configureSdkClient(builder); - SqsClient client = configureSqsClient(builder.build()); - - client.createQueue(createQueueRequest); - - client.sendMessage(sendMessageRequest); - - ReceiveMessageResponse response = client.receiveMessage(receiveMessageRequest); - - assertThat(response.messages().size()).isEqualTo(1); - - response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {})); - assertSqsTraces(false, false); - } - @Test void testCaptureMessageHeaderAsAttributeSpan() throws URISyntaxException { SqsClientBuilder builder = SqsClient.builder(); @@ -423,42 +253,6 @@ void testCaptureMessageHeaderAsAttributeSpan() throws URISyntaxException { assertSqsTraces(false, true); } - @Test - void testSimpleSqsProducerConsumerServicesWithParentSync() throws URISyntaxException { - SqsClientBuilder builder = SqsClient.builder(); - configureSdkClient(builder); - SqsClient client = configureSqsClient(builder.build()); - - client.createQueue(createQueueRequest); - client.sendMessage(sendMessageRequest); - - ReceiveMessageResponse response = - getTesting().runWithSpan("parent", () -> client.receiveMessage(receiveMessageRequest)); - - assertThat(response.messages().size()).isEqualTo(1); - - response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {})); - assertSqsTraces(true, false); - } - - @SuppressWarnings("InterruptedExceptionSwallowed") - @Test - void testSimpleSqsProducerConsumerServicesAsync() throws Exception { - SqsAsyncClientBuilder builder = SqsAsyncClient.builder(); - configureSdkClient(builder); - SqsAsyncClient client = configureSqsClient(builder.build()); - - client.createQueue(createQueueRequest).get(); - client.sendMessage(sendMessageRequest).get(); - - ReceiveMessageResponse response = client.receiveMessage(receiveMessageRequest).get(); - - assertThat(response.messages().size()).isEqualTo(1); - - response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {})); - assertSqsTraces(false, false); - } - @Test void testBatchSqsProducerConsumerServicesSync() throws URISyntaxException { SqsClientBuilder builder = SqsClient.builder(); @@ -489,38 +283,7 @@ void testBatchSqsProducerConsumerServicesSync() throws URISyntaxException { trace -> { publishSpan.set(trace.getSpan(0)); trace.hasSpansSatisfyingExactly( - span -> - span.hasName("testSdkSqs publish") - .hasKind(SpanKind.PRODUCER) - .hasNoParent() - .hasAttributesSatisfyingExactly( - equalTo(stringKey("aws.agent"), "java-aws-sdk"), - equalTo( - stringKey("aws.queue.url"), - "http://localhost:" + sqsPort + "/000000000000/testSdkSqs"), - satisfies( - AwsIncubatingAttributes.AWS_REQUEST_ID, - val -> - val.satisfiesAnyOf( - v -> - assertThat(v.trim()) - .isEqualTo( - "00000000-0000-0000-0000-000000000000"), - v -> assertThat(v.trim()).isEqualTo("UNKNOWN"))), - equalTo(RPC_SYSTEM, "aws-api"), - equalTo(RPC_SERVICE, "Sqs"), - equalTo(RPC_METHOD, "SendMessageBatch"), - equalTo(HTTP_REQUEST_METHOD, "POST"), - equalTo(HTTP_RESPONSE_STATUS_CODE, 200), - satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), - equalTo(SERVER_ADDRESS, "localhost"), - equalTo(SERVER_PORT, sqsPort), - equalTo( - MESSAGING_SYSTEM, - MessagingIncubatingAttributes.MessagingSystemIncubatingValues - .AWS_SQS), - equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"), - equalTo(MESSAGING_OPERATION, "publish"))); + span -> publishSpan(span, queueUrl, "SendMessageBatch")); }, trace -> { List> spanAsserts = new ArrayList<>(); @@ -540,10 +303,7 @@ void testBatchSqsProducerConsumerServicesSync() throws URISyntaxException { satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, sqsPort), - equalTo( - MESSAGING_SYSTEM, - MessagingIncubatingAttributes.MessagingSystemIncubatingValues - .AWS_SQS), + equalTo(MESSAGING_SYSTEM, AWS_SQS), equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"), equalTo(MESSAGING_OPERATION, "receive"), equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 3))); @@ -585,10 +345,7 @@ void testBatchSqsProducerConsumerServicesSync() throws URISyntaxException { v -> v.startsWith("http://localhost:" + sqsPort)), equalTo(SERVER_ADDRESS, "localhost"), equalTo(SERVER_PORT, sqsPort), - equalTo( - MESSAGING_SYSTEM, - MessagingIncubatingAttributes - .MessagingSystemIncubatingValues.AWS_SQS), + equalTo(MESSAGING_SYSTEM, AWS_SQS), equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"), equalTo(MESSAGING_OPERATION, "process"), satisfies(