diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesExtractor.java index ba483967270d..88acbbbbf603 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesExtractor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaProducerAttributesExtractor.java @@ -53,6 +53,7 @@ public void onEnd( @Nullable Throwable error) { if (recordMetadata != null) { + recordMetadata.serializedValueSize(); attributes.put( MESSAGING_DESTINATION_PARTITION_ID, String.valueOf(recordMetadata.partition())); attributes.put(MESSAGING_KAFKA_MESSAGE_OFFSET, recordMetadata.offset()); diff --git a/instrumentation/opensearch/README.md b/instrumentation/opensearch/README.md index 8bc5f198736d..5289906d9e34 100644 --- a/instrumentation/opensearch/README.md +++ b/instrumentation/opensearch/README.md @@ -3,3 +3,10 @@ | System property | Type | Default | Description | | -------------------------------------------------------------- | ------- | ------- | --------------------------------------------------- | | `otel.instrumentation.opensearch.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. | + +## Settings for the [OpenSearch Java Client](https://docs.opensearch.org/latest/clients/java/) instrumentation + +| System property | Type | Default | Description | +|-----------------------------------------------------------------|---------| ------- |------------------------------------------------------| +| `otel.instrumentation.opensearch.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. | +| `otel.instrumentation.opensearch.capture-search-query` | Boolean | `false` | Enable the capture of sanitized search query bodies. **Note**: Enabling this feature adds overhead for JSON serialization and parsing on search requests. | diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/build.gradle.kts b/instrumentation/opensearch/opensearch-java-3.0/javaagent/build.gradle.kts index da36e2438828..f900e4b2f1a6 100644 --- a/instrumentation/opensearch/opensearch-java-3.0/javaagent/build.gradle.kts +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/build.gradle.kts @@ -18,19 +18,20 @@ dependencies { library("org.opensearch.client:opensearch-java:3.0.0") compileOnly("com.google.auto.value:auto-value-annotations") annotationProcessor("com.google.auto.value:auto-value") + implementation("com.fasterxml.jackson.core:jackson-databind") testImplementation("org.opensearch.client:opensearch-rest-client:3.0.0") testImplementation(project(":instrumentation:opensearch:opensearch-rest-common:testing")) testInstrumentation(project(":instrumentation:apache-httpclient:apache-httpclient-5.0:javaagent")) - // For testing AwsSdk2Transport + // AwsSdk2Transport supports awssdk version 2.26.0 testInstrumentation(project(":instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent")) testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent")) - testImplementation("software.amazon.awssdk:auth:2.22.0") - testImplementation("software.amazon.awssdk:identity-spi:2.22.0") - testImplementation("software.amazon.awssdk:apache-client:2.22.0") - testImplementation("software.amazon.awssdk:netty-nio-client:2.22.0") - testImplementation("software.amazon.awssdk:regions:2.22.0") + testImplementation("software.amazon.awssdk:auth:2.26.0") + testImplementation("software.amazon.awssdk:identity-spi:2.26.0") + testImplementation("software.amazon.awssdk:apache-client:2.26.0") + testImplementation("software.amazon.awssdk:netty-nio-client:2.26.0") + testImplementation("software.amazon.awssdk:regions:2.26.0") } tasks { @@ -39,14 +40,47 @@ tasks { systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false") } + test { + filter { + excludeTestsMatching("OpenSearchCaptureSearchQueryTest") + } + } + + val testCaptureSearchQuery by registering(Test::class) { + testClassesDirs = sourceSets.test.get().output.classesDirs + classpath = sourceSets.test.get().runtimeClasspath + + filter { + includeTestsMatching("OpenSearchCaptureSearchQueryTest") + } + jvmArgs("-Dotel.instrumentation.opensearch.capture-search-query=true") + } + val testStableSemconv by registering(Test::class) { testClassesDirs = sourceSets.test.get().output.classesDirs classpath = sourceSets.test.get().runtimeClasspath + + filter { + excludeTestsMatching("OpenSearchCaptureSearchQueryTest") + } jvmArgs("-Dotel.semconv-stability.opt-in=database") systemProperty("metadataConfig", "otel.semconv-stability.opt-in=database") } + val testCaptureSearchQueryStableSemconv by registering(Test::class) { + testClassesDirs = sourceSets.test.get().output.classesDirs + classpath = sourceSets.test.get().runtimeClasspath + + filter { + includeTestsMatching("OpenSearchCaptureSearchQueryTest") + } + jvmArgs("-Dotel.instrumentation.opensearch.capture-search-query=true") + jvmArgs("-Dotel.semconv-stability.opt-in=database") + } + check { + dependsOn(testCaptureSearchQuery) dependsOn(testStableSemconv) + dependsOn(testCaptureSearchQueryStableSemconv) } } diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchAttributesGetter.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchAttributesGetter.java index 7c2c9a0d1f51..5be00473c8f9 100644 --- a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchAttributesGetter.java +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchAttributesGetter.java @@ -26,7 +26,11 @@ public String getDbNamespace(OpenSearchRequest request) { @Override @Nullable public String getDbQueryText(OpenSearchRequest request) { - return request.getMethod() + " " + request.getOperation(); + // keep the previous logic in case of failure to extract the query body + if (request.getBody() == null) { + return request.getMethod() + " " + request.getOperation(); + } + return request.getBody(); } @Override diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchBodyExtractor.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchBodyExtractor.java new file mode 100644 index 000000000000..10c88d5ab353 --- /dev/null +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchBodyExtractor.java @@ -0,0 +1,69 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0; + +import static java.util.logging.Level.FINE; + +import jakarta.json.stream.JsonGenerator; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.logging.Logger; +import javax.annotation.Nullable; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.json.NdJsonpSerializable; +import org.opensearch.client.transport.GenericSerializable; + +public final class OpenSearchBodyExtractor { + + private static final Logger logger = Logger.getLogger(OpenSearchBodyExtractor.class.getName()); + + @Nullable + public static String extract(JsonpMapper mapper, Object request) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + if (request instanceof NdJsonpSerializable) { + writeNdJson(mapper, (NdJsonpSerializable) request, baos); + } else if (request instanceof GenericSerializable) { + ((GenericSerializable) request).serialize(baos); + } else { + JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); + mapper.serialize(request, generator); + generator.close(); + } + + String body = baos.toString(StandardCharsets.UTF_8); + return body.isEmpty() ? null : body; + } catch (RuntimeException e) { + logger.log(FINE, "Failure extracting body", e); + return null; + } + } + + private static void writeNdJson( + JsonpMapper mapper, NdJsonpSerializable value, ByteArrayOutputStream baos) { + try { + Iterator values = value._serializables(); + while (values.hasNext()) { + Object item = values.next(); + if (item instanceof NdJsonpSerializable && item != value) { + // do not recurse on the item itself + writeNdJson(mapper, (NdJsonpSerializable) item, baos); + } else { + JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); + mapper.serialize(item, generator); + generator.close(); + baos.write('\n'); + } + } + } catch (RuntimeException e) { + logger.log(FINE, "Failure serializing NdJson", e); + } + } + + private OpenSearchBodyExtractor() {} +} diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchBodySanitizer.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchBodySanitizer.java new file mode 100644 index 000000000000..d44c73ed9b9b --- /dev/null +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchBodySanitizer.java @@ -0,0 +1,104 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0; + +import static java.util.logging.Level.FINE; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; + +public final class OpenSearchBodySanitizer { + + private static final Logger logger = Logger.getLogger(OpenSearchBodySanitizer.class.getName()); + + private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new ObjectMapper(); + private static final String MASKED_VALUE = "?"; + private static final OpenSearchBodySanitizer DEFAULT_INSTANCE = + new OpenSearchBodySanitizer(DEFAULT_OBJECT_MAPPER); + + private final ObjectMapper objectMapper; + + private OpenSearchBodySanitizer(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; + } + + public static String sanitize(String jsonString) { + return DEFAULT_INSTANCE.sanitizeInstance(jsonString); + } + + public String sanitizeInstance(String jsonString) { + if (jsonString == null) { + return null; + } + + List queries = QuerySplitter.splitQueries(jsonString); + if (queries.isEmpty()) { + return null; + } + + List sanitizedQueries = new ArrayList<>(); + for (String query : queries) { + String sanitized = sanitizeSingleQuery(query); + sanitizedQueries.add(sanitized); + } + + return QuerySplitter.joinQueries(sanitizedQueries); + } + + private String sanitizeSingleQuery(String query) { + try { + JsonNode rootNode = objectMapper.readTree(query); + JsonNode sanitizedNode = sanitizeNode(rootNode); + return objectMapper.writeValueAsString(sanitizedNode); + } catch (Exception e) { + logger.log(FINE, "Failure sanitizing single query", e); + return query; + } + } + + private JsonNode sanitizeNode(JsonNode node) { + if (node == null || node.isNull()) { + return node; + } + + if (node.isTextual()) { + return new TextNode(MASKED_VALUE); + } + + if (node.isNumber() || node.isBoolean()) { + return new TextNode(MASKED_VALUE); + } + + if (node.isArray()) { + ArrayNode arrayNode = objectMapper.createArrayNode(); + for (JsonNode element : node) { + arrayNode.add(sanitizeNode(element)); + } + return arrayNode; + } + + if (node.isObject()) { + ObjectNode objectNode = objectMapper.createObjectNode(); + + for (Map.Entry field : node.properties()) { + String key = field.getKey(); + JsonNode value = field.getValue(); + + objectNode.set(key, sanitizeNode(value)); + } + return objectNode; + } + + return node; + } +} diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchRequest.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchRequest.java index a7db9ceb21be..0b3705166cd0 100644 --- a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchRequest.java +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchRequest.java @@ -6,15 +6,19 @@ package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0; import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; @AutoValue public abstract class OpenSearchRequest { - public static OpenSearchRequest create(String method, String endpoint) { - return new AutoValue_OpenSearchRequest(method, endpoint); + public static OpenSearchRequest create(String method, String endpoint, @Nullable String body) { + return new AutoValue_OpenSearchRequest(method, endpoint, body); } public abstract String getMethod(); public abstract String getOperation(); + + @Nullable + public abstract String getBody(); } diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchSingletons.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchSingletons.java index ef07442b65e8..a34a8e548da0 100644 --- a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchSingletons.java +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchSingletons.java @@ -11,10 +11,15 @@ import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig; public final class OpenSearchSingletons { private static final Instrumenter INSTRUMENTER = createInstrumenter(); + public static final boolean CAPTURE_SEARCH_QUERY = + AgentInstrumentationConfig.get() + .getBoolean("otel.instrumentation.opensearch.capture-search-query", false); + public static Instrumenter instrumenter() { return INSTRUMENTER; } diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchTransportInstrumentation.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchTransportInstrumentation.java index 6cf1cfe84993..f403e75d2414 100644 --- a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchTransportInstrumentation.java +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchTransportInstrumentation.java @@ -21,7 +21,11 @@ import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.opensearch.core.MsearchRequest; +import org.opensearch.client.opensearch.core.SearchRequest; import org.opensearch.client.transport.Endpoint; +import org.opensearch.client.transport.OpenSearchTransport; public class OpenSearchTransportInstrumentation implements TypeInstrumentation { @Override @@ -60,10 +64,22 @@ private AdviceScope(OpenSearchRequest otelRequest, Context context, Scope scope) } @Nullable - public static AdviceScope start(Object request, Endpoint endpoint) { + public static AdviceScope start( + Object request, Endpoint endpoint, JsonpMapper jsonpMapper) { Context parentContext = Context.current(); + + String queryBody = null; + + if (OpenSearchSingletons.CAPTURE_SEARCH_QUERY + && (request instanceof SearchRequest || request instanceof MsearchRequest)) { + String rawBody = OpenSearchBodyExtractor.extract(jsonpMapper, request); + queryBody = OpenSearchBodySanitizer.sanitize(rawBody); + } + OpenSearchRequest otelRequest = - OpenSearchRequest.create(endpoint.method(request), endpoint.requestUrl(request)); + OpenSearchRequest.create( + endpoint.method(request), endpoint.requestUrl(request), queryBody); + if (!instrumenter().shouldStart(parentContext, otelRequest)) { return null; } @@ -94,9 +110,10 @@ public static class PerformRequestAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static AdviceScope onEnter( + @Advice.This OpenSearchTransport openSearchTransport, @Advice.Argument(0) Object request, @Advice.Argument(1) Endpoint endpoint) { - return AdviceScope.start(request, endpoint); + return AdviceScope.start(request, endpoint, openSearchTransport.jsonpMapper()); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @@ -114,9 +131,11 @@ public static class PerformRequestAsyncAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Object[] onEnter( + @Advice.This OpenSearchTransport openSearchTransport, @Advice.Argument(0) Object request, @Advice.Argument(1) Endpoint endpoint) { - AdviceScope adviceScope = AdviceScope.start(request, endpoint); + AdviceScope adviceScope = + AdviceScope.start(request, endpoint, openSearchTransport.jsonpMapper()); return new Object[] {adviceScope}; } diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/QuerySplitter.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/QuerySplitter.java new file mode 100644 index 000000000000..77d87f7e4a40 --- /dev/null +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/QuerySplitter.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Splits and joins queries for newline-delimited JSON (nd-json) format. Splits input by newlines + * and joins output with semicolons for display. + */ +class QuerySplitter { + + private static final String QUERY_SEPARATOR = "\n"; + private static final String QUERY_COMBINATOR = ";"; + + private QuerySplitter() {} + + /** + * Splits a string containing multiple queries separated by newlines. + * + * @param queriesString input string containing queries + * @return list of individual query strings, empty if input is null or empty + */ + static List splitQueries(String queriesString) { + if (queriesString == null || queriesString.trim().isEmpty()) { + return Collections.emptyList(); + } + + String[] queries = queriesString.split(QUERY_SEPARATOR, -1); + List result = new ArrayList<>(); + + for (String query : queries) { + String trimmed = query.trim(); + if (!trimmed.isEmpty()) { + result.add(trimmed); + } + } + + return result; + } + + /** + * Joins multiple sanitized queries back into a single string. + * + * @param sanitizedQueries list of sanitized query strings + * @return joined string with semicolon separator, or null if list is empty + */ + static String joinQueries(List sanitizedQueries) { + if (sanitizedQueries == null || sanitizedQueries.isEmpty()) { + return null; + } + + return String.join(QUERY_COMBINATOR, sanitizedQueries); + } +} diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/AbstractOpenSearchTest.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/AbstractOpenSearchTest.java index 63d522fe8c36..ed914b2cf09e 100644 --- a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/AbstractOpenSearchTest.java +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/AbstractOpenSearchTest.java @@ -8,6 +8,7 @@ import static io.opentelemetry.instrumentation.testing.junit.db.DbClientMetricsTestUtil.assertDurationMetric; import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static io.opentelemetry.semconv.DbAttributes.DB_OPERATION_NAME; import static io.opentelemetry.semconv.DbAttributes.DB_SYSTEM_NAME; import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; @@ -36,7 +37,13 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.mapping.TypeMapping; +import org.opensearch.client.opensearch._types.query_dsl.Query; import org.opensearch.client.opensearch.cluster.HealthResponse; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.SearchResponse; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; import org.opensearch.testcontainers.OpensearchContainer; import org.testcontainers.utility.DockerImageName; @@ -44,6 +51,7 @@ @TestInstance(TestInstance.Lifecycle.PER_CLASS) abstract class AbstractOpenSearchTest { + protected static final String INDEX_NAME = "test-search-index"; protected OpenSearchClient openSearchClient; protected OpenSearchAsyncClient openSearchAsyncClient; protected OpensearchContainer opensearch; @@ -73,6 +81,32 @@ void setUp() throws Exception { httpHost = URI.create(opensearch.getHttpHostAddress()); openSearchClient = buildOpenSearchClient(); openSearchAsyncClient = buildOpenSearchAsyncClient(); + + String documentId = "test-doc-1"; + + // Create index + CreateIndexRequest createIndexRequest = + CreateIndexRequest.of( + c -> + c.index(INDEX_NAME) + .mappings( + TypeMapping.of( + t -> + t.properties( + "message", + p -> + p.text(txt -> txt.fielddata(true).analyzer("standard")))))); + + openSearchClient.indices().create(createIndexRequest); + + TestDocument testDocument = TestDocument.create(documentId, "test message for search"); + IndexRequest indexRequest = + new IndexRequest.Builder().index(INDEX_NAME).document(testDocument).build(); + + openSearchClient.index(indexRequest); + + // Wait for indexing to complete + openSearchClient.indices().refresh(r -> r.index(INDEX_NAME)); } @AfterAll @@ -171,4 +205,57 @@ void shouldRecordMetrics() throws IOException { assertDurationMetric( getTesting(), "io.opentelemetry.opensearch-java-3.0", DB_OPERATION_NAME, DB_SYSTEM_NAME); } + + @Test + void shouldNotCaptureSearchQueryBodyWhenDisabled() throws IOException { + // Execute search query with body + SearchRequest searchRequest = + SearchRequest.of( + s -> + s.index(INDEX_NAME) + .query( + Query.of( + q -> + q.match( + m -> m.field("message").query(v -> v.stringValue("test")))))); + + SearchResponse searchResponse = + openSearchClient.search(searchRequest, TestDocument.class); + assertThat(searchResponse.hits().total().value()).isGreaterThan(0); + + // Verify trace does NOT include query body, only method + operation + getTesting() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("POST") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(maybeStable(DB_SYSTEM), "opensearch"), + equalTo(maybeStable(DB_OPERATION), "POST"), + // DB_STATEMENT should be method + operation, not JSON body + satisfies( + maybeStable(DB_STATEMENT), + statement -> + statement + .asString() + .startsWith("POST /" + INDEX_NAME + "/_search"))), + span -> + span.hasName("POST") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + equalTo(SERVER_ADDRESS, httpHost.getHost()), + equalTo(SERVER_PORT, httpHost.getPort()), + equalTo(HTTP_REQUEST_METHOD, "POST"), + satisfies( + URL_FULL, + url -> + url.asString() + .startsWith(httpHost + "/" + INDEX_NAME + "/_search")), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200L), + equalTo(PEER_SERVICE, "test-peer-service")))); + } } diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchAwsSdk2TransportTest.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchAwsSdk2TransportTest.java index 13999ba6ad43..953aa4d141e0 100644 --- a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchAwsSdk2TransportTest.java +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchAwsSdk2TransportTest.java @@ -28,11 +28,11 @@ import io.opentelemetry.testing.internal.armeria.common.HttpStatus; import io.opentelemetry.testing.internal.armeria.common.MediaType; import io.opentelemetry.testing.internal.armeria.testing.junit5.server.mock.MockWebServerExtension; +import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.opensearch.client.opensearch.OpenSearchAsyncClient; @@ -64,7 +64,7 @@ class OpenSearchAwsSdk2TransportTest extends AbstractOpenSearchTest { @BeforeAll @Override - void setUp() throws Exception { + void setUp() { server.start(); openSearchClient = buildOpenSearchClient(); openSearchAsyncClient = buildOpenSearchAsyncClient(); @@ -77,8 +77,7 @@ void tearDown() { server.stop(); } - @BeforeEach - void prepTest() { + void setupForHealthResponse() { server.beforeTestExecution(null); // Mock OpenSearch cluster health response @@ -106,13 +105,49 @@ void prepTest() { server.enqueue(HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, healthResponse)); } + void setupForSearchResponse() { + server.beforeTestExecution(null); // Added this line + + // Mock OpenSearch Search response, matching the TestDocument class structure + String searchResponseJson = + "{\n" + + " \"took\": 5,\n" + + " \"timed_out\": false,\n" + + " \"_shards\": {\n" + + " \"total\": 1,\n" + + " \"successful\": 1,\n" + + " \"skipped\": 0,\n" + + " \"failed\": 0\n" + + " },\n" + + " \"hits\": {\n" + + " \"total\": {\n" + + " \"value\": 1,\n" + + " \"relation\": \"eq\"\n" + + " },\n" + + " \"max_score\": 1.0,\n" + + " \"hits\": [\n" + + " {\n" + + " \"_index\": \"my_index\",\n" + + " \"_id\": \"1\",\n" + + " \"_score\": 1.0,\n" + + " \"_source\": {\n" + + " \"id\": \"doc-1\",\n" // Corrected field + + " \"message\": \"This is a test document.\"\n" // Corrected field + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + "}"; + server.enqueue(HttpResponse.of(HttpStatus.OK, MediaType.JSON_UTF_8, searchResponseJson)); + } + @Override protected InstrumentationExtension getTesting() { return testing; } @Override - protected OpenSearchClient buildOpenSearchClient() throws Exception { + protected OpenSearchClient buildOpenSearchClient() { SdkHttpClient httpClient = ApacheHttpClient.builder() .buildWithDefaults( @@ -131,7 +166,7 @@ protected OpenSearchClient buildOpenSearchClient() throws Exception { } @Override - protected OpenSearchAsyncClient buildOpenSearchAsyncClient() throws Exception { + protected OpenSearchAsyncClient buildOpenSearchAsyncClient() { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .buildWithDefaults( @@ -149,9 +184,17 @@ protected OpenSearchAsyncClient buildOpenSearchAsyncClient() throws Exception { return new OpenSearchAsyncClient(transport); } + @Test + @Override + void shouldGetStatusWithTraces() throws IOException { + setupForHealthResponse(); + super.shouldGetStatusWithTraces(); + } + @Test @Override void shouldGetStatusAsyncWithTraces() throws Exception { + setupForHealthResponse(); CountDownLatch countDownLatch = new CountDownLatch(1); CompletableFuture responseCompletableFuture = @@ -193,19 +236,27 @@ void shouldGetStatusAsyncWithTraces() throws Exception { equalTo(SERVER_PORT, httpHost.getPort()), equalTo(HTTP_REQUEST_METHOD, "GET"), equalTo(URL_FULL, httpHost + "/_cluster/health"), - equalTo( - NETWORK_PEER_ADDRESS, - httpHost.getHost()), // Netty 4.1 Instrumentation collects - // NETWORK_PEER_ADDRESS - equalTo( - NETWORK_PEER_PORT, - httpHost.getPort()), // Netty 4.1 Instrumentation collects - // NETWORK_PEER_PORT equalTo(HTTP_RESPONSE_STATUS_CODE, 200L), - equalTo(PEER_SERVICE, "test-peer-service")), + equalTo(PEER_SERVICE, "test-peer-service"), + equalTo(NETWORK_PEER_ADDRESS, server.httpsEndpoint().host()), + equalTo(NETWORK_PEER_PORT, server.httpsPort())), span -> span.hasName("callback") .hasKind(SpanKind.INTERNAL) .hasParent(trace.getSpan(1)))); } + + @Test + @Override + void shouldRecordMetrics() throws IOException { + setupForHealthResponse(); + super.shouldRecordMetrics(); + } + + @Test + @Override + void shouldNotCaptureSearchQueryBodyWhenDisabled() throws IOException { + setupForSearchResponse(); + super.shouldNotCaptureSearchQueryBodyWhenDisabled(); + } } diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchCaptureSearchQueryTest.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchCaptureSearchQueryTest.java new file mode 100644 index 000000000000..01af90df888b --- /dev/null +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/OpenSearchCaptureSearchQueryTest.java @@ -0,0 +1,311 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0; + +import static io.opentelemetry.instrumentation.testing.junit.db.SemconvStabilityUtil.maybeStable; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD; +import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE; +import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PROTOCOL_VERSION; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS; +import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT; +import static io.opentelemetry.semconv.UrlAttributes.URL_FULL; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_OPERATION; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_STATEMENT; +import static io.opentelemetry.semconv.incubating.DbIncubatingAttributes.DB_SYSTEM; +import static io.opentelemetry.semconv.incubating.PeerIncubatingAttributes.PEER_SERVICE; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.io.IOException; +import java.net.URI; +import javax.net.ssl.SSLContext; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.client5.http.ssl.NoopHostnameVerifier; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.ssl.SSLContexts; +import org.apache.hc.core5.ssl.TrustStrategy; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.mapping.TypeMapping; +import org.opensearch.client.opensearch._types.query_dsl.Query; +import org.opensearch.client.opensearch.core.IndexRequest; +import org.opensearch.client.opensearch.core.MsearchRequest; +import org.opensearch.client.opensearch.core.MsearchResponse; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.SearchResponse; +import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; +import org.opensearch.testcontainers.OpensearchContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Tests for capture-search-query=true configuration. This test class runs with + * -Dotel.instrumentation.opensearch.capture-search-query=true and verifies that query bodies are + * captured in DB_STATEMENT. + */ +@SuppressWarnings("deprecation") // using deprecated semconv +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class OpenSearchCaptureSearchQueryTest { + + private static final String INDEX_NAME = "test-search-index"; + private OpenSearchClient openSearchClient; + private OpensearchContainer opensearch; + private URI httpHost; + + @RegisterExtension + static final AgentInstrumentationExtension testing = AgentInstrumentationExtension.create(); + + protected InstrumentationExtension getTesting() { + return testing; + } + + @BeforeAll + void setUp() throws Exception { + opensearch = + new OpensearchContainer(DockerImageName.parse("opensearchproject/opensearch:1.3.6")) + .withSecurityEnabled(); + opensearch.withEnv( + "OPENSEARCH_JAVA_OPTS", + "-Xmx256m -Xms256m -Dlog4j2.disableJmx=true -Dlog4j2.disable.jmx=true -XX:-UseContainerSupport"); + opensearch.start(); + httpHost = URI.create(opensearch.getHttpHostAddress()); + openSearchClient = buildOpenSearchClient(); + + String documentId = "test-doc-1"; + + CreateIndexRequest createIndexRequest = + CreateIndexRequest.of( + c -> + c.index(INDEX_NAME) + .mappings( + TypeMapping.of( + t -> + t.properties( + "message", + p -> + p.text(txt -> txt.fielddata(true).analyzer("standard")))))); + + openSearchClient.indices().create(createIndexRequest); + + TestDocument testDocument = TestDocument.create(documentId, "test message for search"); + IndexRequest indexRequest = + new IndexRequest.Builder().index(INDEX_NAME).document(testDocument).build(); + + openSearchClient.index(indexRequest); + openSearchClient.indices().refresh(r -> r.index(INDEX_NAME)); + } + + private OpenSearchClient buildOpenSearchClient() throws Exception { + HttpHost host = new HttpHost("https", httpHost.getHost(), httpHost.getPort()); + + TrustStrategy acceptingTrustStrategy = (certificate, authType) -> true; + SSLContext sslContext = + SSLContexts.custom().loadTrustMaterial(null, acceptingTrustStrategy).build(); + TlsStrategy tlsStrategy = + ClientTlsStrategyBuilder.create() + .setHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .setSslContext(sslContext) + .build(); + PoolingAsyncClientConnectionManager connectionManager = + PoolingAsyncClientConnectionManagerBuilder.create().setTlsStrategy(tlsStrategy).build(); + + BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + new AuthScope(null, -1), + new UsernamePasswordCredentials( + opensearch.getUsername(), opensearch.getPassword().toCharArray())); + + OpenSearchTransport apacheHttpClient5Transport = + ApacheHttpClient5TransportBuilder.builder(host) + .setHttpClientConfigCallback( + httpClientBuilder -> + httpClientBuilder + .setDefaultCredentialsProvider(credentialsProvider) + .setConnectionManager(connectionManager) + .setDefaultCredentialsProvider(credentialsProvider)) + .build(); + return new OpenSearchClient(apacheHttpClient5Transport); + } + + @AfterAll + void tearDown() { + opensearch.stop(); + } + + @Test + void shouldCaptureSearchQueryBody() throws IOException { + SearchRequest searchRequest = + SearchRequest.of( + s -> + s.index(INDEX_NAME) + .query( + Query.of( + q -> + q.match( + m -> m.field("message").query(v -> v.stringValue("test")))))); + + SearchResponse searchResponse = + openSearchClient.search(searchRequest, TestDocument.class); + assertThat(searchResponse.hits().total().value()).isGreaterThan(0); + + getTesting() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("POST") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(maybeStable(DB_SYSTEM), "opensearch"), + equalTo(maybeStable(DB_OPERATION), "POST"), + equalTo( + maybeStable(DB_STATEMENT), + "{\"query\":{\"match\":{\"message\":{\"query\":\"?\"}}}}")), + span -> + span.hasName("POST") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + equalTo(SERVER_ADDRESS, httpHost.getHost()), + equalTo(SERVER_PORT, httpHost.getPort()), + equalTo(HTTP_REQUEST_METHOD, "POST"), + satisfies( + URL_FULL, + url -> + url.asString() + .startsWith(httpHost + "/" + INDEX_NAME + "/_search")), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200L), + equalTo(PEER_SERVICE, "test-peer-service")))); + } + + @Test + void shouldCaptureMsearchQueryBody() throws IOException { + MsearchRequest msearchRequest = + new MsearchRequest.Builder() + .searches( + s -> + s.header(h -> h.index(INDEX_NAME)) + .body( + b -> + b.query( + q -> + q.term( + t -> + t.field("message") + .value(v -> v.stringValue("message")))))) + .searches( + s -> + s.header(h -> h.index(INDEX_NAME)) + .body( + b -> + b.query( + q -> + q.term( + t -> + t.field("message2") + .value(v -> v.longValue(100L)))))) + .searches( + s -> + s.header(h -> h.index(INDEX_NAME)) + .body( + b -> + b.query( + q -> + q.term( + t -> + t.field("message3") + .value(v -> v.booleanValue(true)))))) + .build(); + + MsearchResponse msearchResponse = + openSearchClient.msearch(msearchRequest, TestDocument.class); + assertThat(msearchResponse.responses().size()).isGreaterThan(0); + + getTesting() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("POST") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(maybeStable(DB_SYSTEM), "opensearch"), + equalTo(maybeStable(DB_OPERATION), "POST"), + equalTo( + maybeStable(DB_STATEMENT), + "{\"index\":[\"?\"]};{\"query\":{\"term\":{\"message\":{\"value\":\"?\"}}}};{\"index\":[\"?\"]};{\"query\":{\"term\":{\"message2\":{\"value\":\"?\"}}}};{\"index\":[\"?\"]};{\"query\":{\"term\":{\"message3\":{\"value\":\"?\"}}}}")), + span -> + span.hasName("POST") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + equalTo(SERVER_ADDRESS, httpHost.getHost()), + equalTo(SERVER_PORT, httpHost.getPort()), + equalTo(HTTP_REQUEST_METHOD, "POST"), + satisfies( + URL_FULL, + url -> + url.asString() + .startsWith( + httpHost + "/" + "_msearch?typed_keys=true")), + equalTo(HTTP_RESPONSE_STATUS_CODE, 200L), + equalTo(PEER_SERVICE, "test-peer-service")))); + } + + @Test + void shouldNotCaptureIndexQueryBody() throws IOException { + TestDocument testDocument = TestDocument.create("test-doc-2", "index body test message"); + IndexRequest indexRequest = + new IndexRequest.Builder().index(INDEX_NAME).document(testDocument).build(); + + openSearchClient.index(indexRequest); + + getTesting() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("POST") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + equalTo(maybeStable(DB_SYSTEM), "opensearch"), + equalTo(maybeStable(DB_OPERATION), "POST"), + equalTo(maybeStable(DB_STATEMENT), "POST /test-search-index/_doc")), + span -> + span.hasName("POST") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(NETWORK_PROTOCOL_VERSION, "1.1"), + equalTo(SERVER_ADDRESS, httpHost.getHost()), + equalTo(SERVER_PORT, httpHost.getPort()), + equalTo(HTTP_REQUEST_METHOD, "POST"), + satisfies( + URL_FULL, + url -> + url.asString() + .startsWith(httpHost + "/" + INDEX_NAME + "/_doc")), + equalTo(HTTP_RESPONSE_STATUS_CODE, 201L), + equalTo(PEER_SERVICE, "test-peer-service")))); + } +} diff --git a/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/TestDocument.java b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/TestDocument.java new file mode 100644 index 000000000000..85fe78fb3d7b --- /dev/null +++ b/instrumentation/opensearch/opensearch-java-3.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/opensearch/v3_0/TestDocument.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0; + +public class TestDocument { + private String id; + private String message; + + public TestDocument() {} + + public TestDocument(String id, String message) { + this.id = id; + this.message = message; + } + + public static TestDocument create(String id, String message) { + return new TestDocument(id, message); + } + + public String getId() { + return id; + } + + public String getMessage() { + return message; + } +}