Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void onEnd(
@Nullable Throwable error) {

if (recordMetadata != null) {
recordMetadata.serializedValueSize();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change seems unrelated to opensearch

attributes.put(
MESSAGING_DESTINATION_PARTITION_ID, String.valueOf(recordMetadata.partition()));
attributes.put(MESSAGING_KAFKA_MESSAGE_OFFSET, recordMetadata.offset());
Expand Down
7 changes: 7 additions & 0 deletions instrumentation/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,10 @@
| System property | Type | Default | Description |
| -------------------------------------------------------------- | ------- | ------- | --------------------------------------------------- |
| `otel.instrumentation.opensearch.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |

## Settings for the [OpenSearch Java Client](https://docs.opensearch.org/latest/clients/java/) instrumentation

| System property | Type | Default | Description |
|-----------------------------------------------------------------|---------| ------- |------------------------------------------------------|
| `otel.instrumentation.opensearch.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |
| `otel.instrumentation.opensearch.capture-search-query` | Boolean | `false` | Enable the capture of sanitized search query bodies. **Note**: Enabling this feature adds overhead for JSON serialization and parsing on search requests. |
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ dependencies {
library("org.opensearch.client:opensearch-java:3.0.0")
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
implementation("com.fasterxml.jackson.core:jackson-databind")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be compileOnly. Implementation dependencies are embedded inside the agent, but your code seems to use the jackson-databind that is bundled inside the application.


testImplementation("org.opensearch.client:opensearch-rest-client:3.0.0")
testImplementation(project(":instrumentation:opensearch:opensearch-rest-common:testing"))
testInstrumentation(project(":instrumentation:apache-httpclient:apache-httpclient-5.0:javaagent"))

// For testing AwsSdk2Transport
// AwsSdk2Transport supports awssdk version 2.26.0
testInstrumentation(project(":instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent"))
testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))
testImplementation("software.amazon.awssdk:auth:2.22.0")
testImplementation("software.amazon.awssdk:identity-spi:2.22.0")
testImplementation("software.amazon.awssdk:apache-client:2.22.0")
testImplementation("software.amazon.awssdk:netty-nio-client:2.22.0")
testImplementation("software.amazon.awssdk:regions:2.22.0")
testImplementation("software.amazon.awssdk:auth:2.26.0")
testImplementation("software.amazon.awssdk:identity-spi:2.26.0")
testImplementation("software.amazon.awssdk:apache-client:2.26.0")
testImplementation("software.amazon.awssdk:netty-nio-client:2.26.0")
testImplementation("software.amazon.awssdk:regions:2.26.0")
}

tasks {
Expand All @@ -39,14 +40,47 @@ tasks {
systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false")
}

test {
filter {
excludeTestsMatching("OpenSearchCaptureSearchQueryTest")
}
}

val testCaptureSearchQuery by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath

filter {
includeTestsMatching("OpenSearchCaptureSearchQueryTest")
}
jvmArgs("-Dotel.instrumentation.opensearch.capture-search-query=true")
}

val testStableSemconv by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath

filter {
excludeTestsMatching("OpenSearchCaptureSearchQueryTest")
}
jvmArgs("-Dotel.semconv-stability.opt-in=database")
systemProperty("metadataConfig", "otel.semconv-stability.opt-in=database")
}

val testCaptureSearchQueryStableSemconv by registering(Test::class) {
testClassesDirs = sourceSets.test.get().output.classesDirs
classpath = sourceSets.test.get().runtimeClasspath

filter {
includeTestsMatching("OpenSearchCaptureSearchQueryTest")
}
jvmArgs("-Dotel.instrumentation.opensearch.capture-search-query=true")
jvmArgs("-Dotel.semconv-stability.opt-in=database")
}

check {
dependsOn(testCaptureSearchQuery)
dependsOn(testStableSemconv)
dependsOn(testCaptureSearchQueryStableSemconv)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ public String getDbNamespace(OpenSearchRequest request) {
@Override
@Nullable
public String getDbQueryText(OpenSearchRequest request) {
return request.getMethod() + " " + request.getOperation();
// keep the previous logic in case of failure to extract the query body
if (request.getBody() == null) {
return request.getMethod() + " " + request.getOperation();
}
return request.getBody();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;

import static java.util.logging.Level.FINE;

import jakarta.json.stream.JsonGenerator;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.NdJsonpSerializable;
import org.opensearch.client.transport.GenericSerializable;

public final class OpenSearchBodyExtractor {

private static final Logger logger = Logger.getLogger(OpenSearchBodyExtractor.class.getName());

@Nullable
public static String extract(JsonpMapper mapper, Object request) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();

if (request instanceof NdJsonpSerializable) {
writeNdJson(mapper, (NdJsonpSerializable) request, baos);
} else if (request instanceof GenericSerializable) {
((GenericSerializable) request).serialize(baos);
} else {
JsonGenerator generator = mapper.jsonProvider().createGenerator(baos);
mapper.serialize(request, generator);
generator.close();
}

String body = baos.toString(StandardCharsets.UTF_8);
return body.isEmpty() ? null : body;
} catch (RuntimeException e) {
logger.log(FINE, "Failure extracting body", e);
return null;
}
}

private static void writeNdJson(
JsonpMapper mapper, NdJsonpSerializable value, ByteArrayOutputStream baos) {
try {
Iterator<?> values = value._serializables();
while (values.hasNext()) {
Object item = values.next();
if (item instanceof NdJsonpSerializable && item != value) {
// do not recurse on the item itself
writeNdJson(mapper, (NdJsonpSerializable) item, baos);
} else {
JsonGenerator generator = mapper.jsonProvider().createGenerator(baos);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering whether instead of serializing the query to json string and then parsing that json, sanitizing it and serializing it again to json string you could wrap the JsonGenerator here so that it would replace literal values with ? so you wouldn't need to parse the json again.

mapper.serialize(item, generator);
generator.close();
baos.write('\n');
}
}
} catch (RuntimeException e) {
logger.log(FINE, "Failure serializing NdJson", e);
}
}

private OpenSearchBodyExtractor() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;

import static java.util.logging.Level.FINE;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

public final class OpenSearchBodySanitizer {

private static final Logger logger = Logger.getLogger(OpenSearchBodySanitizer.class.getName());

private static final ObjectMapper DEFAULT_OBJECT_MAPPER = new ObjectMapper();
private static final String MASKED_VALUE = "?";
private static final OpenSearchBodySanitizer DEFAULT_INSTANCE =
new OpenSearchBodySanitizer(DEFAULT_OBJECT_MAPPER);

private final ObjectMapper objectMapper;

private OpenSearchBodySanitizer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

public static String sanitize(String jsonString) {
return DEFAULT_INSTANCE.sanitizeInstance(jsonString);
}

public String sanitizeInstance(String jsonString) {
if (jsonString == null) {
return null;
}

List<String> queries = QuerySplitter.splitQueries(jsonString);
if (queries.isEmpty()) {
return null;
}

List<String> sanitizedQueries = new ArrayList<>();
for (String query : queries) {
String sanitized = sanitizeSingleQuery(query);
sanitizedQueries.add(sanitized);
}

return QuerySplitter.joinQueries(sanitizedQueries);
}

private String sanitizeSingleQuery(String query) {
try {
JsonNode rootNode = objectMapper.readTree(query);
JsonNode sanitizedNode = sanitizeNode(rootNode);
return objectMapper.writeValueAsString(sanitizedNode);
} catch (Exception e) {
logger.log(FINE, "Failure sanitizing single query", e);
return query;
}
}

private JsonNode sanitizeNode(JsonNode node) {
if (node == null || node.isNull()) {
return node;
}

if (node.isTextual()) {
return new TextNode(MASKED_VALUE);
}

if (node.isNumber() || node.isBoolean()) {
return new TextNode(MASKED_VALUE);
}

if (node.isArray()) {
ArrayNode arrayNode = objectMapper.createArrayNode();
for (JsonNode element : node) {
arrayNode.add(sanitizeNode(element));
}
return arrayNode;
}

if (node.isObject()) {
ObjectNode objectNode = objectMapper.createObjectNode();

for (Map.Entry<String, JsonNode> field : node.properties()) {
String key = field.getKey();
JsonNode value = field.getValue();

objectNode.set(key, sanitizeNode(value));
}
return objectNode;
}

return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@
package io.opentelemetry.javaagent.instrumentation.opensearch.v3_0;

import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;

@AutoValue
public abstract class OpenSearchRequest {

public static OpenSearchRequest create(String method, String endpoint) {
return new AutoValue_OpenSearchRequest(method, endpoint);
public static OpenSearchRequest create(String method, String endpoint, @Nullable String body) {
return new AutoValue_OpenSearchRequest(method, endpoint, body);
}

public abstract String getMethod();

public abstract String getOperation();

@Nullable
public abstract String getBody();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@
import io.opentelemetry.instrumentation.api.incubator.semconv.db.DbClientSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig;

public final class OpenSearchSingletons {
private static final Instrumenter<OpenSearchRequest, Void> INSTRUMENTER = createInstrumenter();

public static final boolean CAPTURE_SEARCH_QUERY =
AgentInstrumentationConfig.get()
.getBoolean("otel.instrumentation.opensearch.capture-search-query", false);

public static Instrumenter<OpenSearchRequest, Void> instrumenter() {
return INSTRUMENTER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.opensearch.core.MsearchRequest;
import org.opensearch.client.opensearch.core.SearchRequest;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.OpenSearchTransport;

public class OpenSearchTransportInstrumentation implements TypeInstrumentation {
@Override
Expand Down Expand Up @@ -60,10 +64,22 @@ private AdviceScope(OpenSearchRequest otelRequest, Context context, Scope scope)
}

@Nullable
public static AdviceScope start(Object request, Endpoint<Object, Object, Object> endpoint) {
public static AdviceScope start(
Object request, Endpoint<Object, Object, Object> endpoint, JsonpMapper jsonpMapper) {
Context parentContext = Context.current();

String queryBody = null;

if (OpenSearchSingletons.CAPTURE_SEARCH_QUERY
&& (request instanceof SearchRequest || request instanceof MsearchRequest)) {
String rawBody = OpenSearchBodyExtractor.extract(jsonpMapper, request);
queryBody = OpenSearchBodySanitizer.sanitize(rawBody);
}

OpenSearchRequest otelRequest =
OpenSearchRequest.create(endpoint.method(request), endpoint.requestUrl(request));
OpenSearchRequest.create(
endpoint.method(request), endpoint.requestUrl(request), queryBody);

if (!instrumenter().shouldStart(parentContext, otelRequest)) {
return null;
}
Expand Down Expand Up @@ -94,9 +110,10 @@ public static class PerformRequestAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static AdviceScope onEnter(
@Advice.This OpenSearchTransport openSearchTransport,
@Advice.Argument(0) Object request,
@Advice.Argument(1) Endpoint<Object, Object, Object> endpoint) {
return AdviceScope.start(request, endpoint);
return AdviceScope.start(request, endpoint, openSearchTransport.jsonpMapper());
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand All @@ -114,9 +131,11 @@ public static class PerformRequestAsyncAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static Object[] onEnter(
@Advice.This OpenSearchTransport openSearchTransport,
@Advice.Argument(0) Object request,
@Advice.Argument(1) Endpoint<Object, Object, Object> endpoint) {
AdviceScope adviceScope = AdviceScope.start(request, endpoint);
AdviceScope adviceScope =
AdviceScope.start(request, endpoint, openSearchTransport.jsonpMapper());
return new Object[] {adviceScope};
}

Expand Down
Loading
Loading