Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.mongodb.bulk.InsertRequest;
import com.mongodb.bulk.UpdateRequest;
import com.mongodb.bulk.WriteRequest;
import com.mongodb.operation.AggregateOperation;
import com.mongodb.operation.CountOperation;
import com.mongodb.operation.CreateCollectionOperation;
import com.mongodb.operation.CreateIndexesOperation;
Expand Down Expand Up @@ -104,6 +105,9 @@ public static String getTraceParam(Object obj) {
} else if (obj instanceof FindAndUpdateOperation) {
BsonDocument filter = ((FindAndUpdateOperation) obj).getFilter();
return limitFilter(filter.toString());
} else if (obj instanceof AggregateOperation) {
List<BsonDocument> pipelines = ((AggregateOperation) obj).getPipeline();
return getPipelines(pipelines);
} else if (obj instanceof MapReduceToCollectionOperation) {
BsonDocument filter = ((MapReduceToCollectionOperation) obj).getFilter();
return limitFilter(filter.toString());
Expand All @@ -115,6 +119,18 @@ public static String getTraceParam(Object obj) {
}
}

private static String getPipelines(List<BsonDocument> pipelines) {
StringBuilder params = new StringBuilder();
for (BsonDocument pipeline : pipelines) {
params.append(pipeline.toString()).append(",");
final int filterLengthLimit = MongoPluginConfig.Plugin.MongoDB.FILTER_LENGTH_LIMIT;
if (filterLengthLimit > 0 && params.length() > filterLengthLimit) {
return params.substring(0, filterLengthLimit) + "...";
}
}
return params.toString();
}

private static String getFilter(List<? extends WriteRequest> writeRequestList) {
StringBuilder params = new StringBuilder();
for (WriteRequest request : writeRequestList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,101 @@

package org.apache.skywalking.apm.plugin.mongodb.v3.support;

import com.mongodb.MongoNamespace;
import lombok.SneakyThrows;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import org.apache.skywalking.apm.plugin.mongodb.v3.MongoPluginConfig;
import org.apache.skywalking.apm.util.StringUtil;

import java.lang.reflect.Field;

public class MongoSpanHelper {

private static final AbstractTag<String> DB_COLLECTION_TAG = Tags.ofKey("db.collection");

private MongoSpanHelper() {
}

@SneakyThrows
public static void createExitSpan(String executeMethod, String remotePeer, Object operation) {
AbstractSpan span = ContextManager.createExitSpan(
MongoConstants.MONGO_DB_OP_PREFIX + executeMethod, new ContextCarrier(), remotePeer);
MongoConstants.MONGO_DB_OP_PREFIX + executeMethod, new ContextCarrier(), remotePeer);
span.setComponent(ComponentsDefine.MONGO_DRIVER);
Tags.DB_TYPE.set(span, MongoConstants.DB_TYPE);
SpanLayer.asDB(span);

Field[] declaredFields = operation.getClass().getDeclaredFields();
MongoNamespace namespace = tryToGetMongoNamespace(operation, declaredFields);
if (namespace != null) {
extractTagsFromNamespace(span, namespace);
}

if (MongoPluginConfig.Plugin.MongoDB.TRACE_PARAM) {
Tags.DB_BIND_VARIABLES.set(span, MongoOperationHelper.getTraceParam(operation));
}
}

private static void extractTagsFromNamespace(AbstractSpan span, MongoNamespace namespace) {
Tags.DB_INSTANCE.set(span, namespace.getDatabaseName());
if (StringUtil.isNotEmpty(namespace.getCollectionName())) {
span.tag(DB_COLLECTION_TAG, namespace.getCollectionName());
}
}

private static MongoNamespace tryToGetMongoNamespace(Object operation, Field[] declaredFields) throws IllegalAccessException {
Field namespaceField = null;
Field wrappedField = null;
Field databaseField = null;
Field collectionField = null;
for (Field field : declaredFields) {
if ("namespace".equals(field.getName())) {
namespaceField = field;
Field.setAccessible(new Field[]{field}, true);
}
if ("wrapped".equals(field.getName())) {
wrappedField = field;
Field.setAccessible(new Field[]{field}, true);
}
if ("databaseName".equals(field.getName())) {
databaseField = field;
Field.setAccessible(new Field[]{field}, true);
}
if ("collectionName".equals(field.getName())) {
collectionField = field;
Field.setAccessible(new Field[]{field}, true);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain this? The more reflect we used, the more performance we impacted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review. I have made the changes using SkyWalkingDynamicField based on your feedback.

}
if (namespaceField != null) {
return (MongoNamespace) namespaceField.get(operation);
}
String database = null;
String collection = null;
if (databaseField != null) {
database = (String) databaseField.get(operation);
}
if (collectionField != null) {
collection = (String) collectionField.get(operation);
}
if (database != null && collection == null) {
return new MongoNamespace(database);
}
if (database != null && collection != null) {
return new MongoNamespace(database, collection);
}
if (wrappedField != null) {
Object wrapped = wrappedField.get(operation);
if (wrapped != null && wrapped != operation) {
Field[] declaredFieldsInWrapped = wrapped.getClass().getDeclaredFields();
return tryToGetMongoNamespace(wrapped, declaredFieldsInWrapped);
}
}
return null;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;

import com.mongodb.operation.AggregateOperation;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.LogDataEntity;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
Expand Down Expand Up @@ -70,6 +73,7 @@ public class MongoDBInterceptorTest {
@Mock
private EnhancedInstance enhancedInstance;

private Decoder decoder;
private Object[] arguments;
private Class[] argumentTypes;

Expand All @@ -89,7 +93,7 @@ public void setUp() throws Exception {
BsonDocument document = new BsonDocument();
document.append("name", new BsonString("by"));
MongoNamespace mongoNamespace = new MongoNamespace("test.user");
Decoder decoder = mock(Decoder.class);
decoder = mock(Decoder.class);
FindOperation findOperation = new FindOperation(mongoNamespace, decoder);
findOperation.filter(document);

Expand All @@ -108,6 +112,24 @@ public void testIntercept() throws Throwable {
assertRedisSpan(spans.get(0));
}

@Test
public void testAggregateOperationIntercept() throws Throwable {
MongoNamespace mongoNamespace = new MongoNamespace("test.user");
BsonDocument matchStage = new BsonDocument("$match", new BsonDocument("name", new BsonString("by")));
List<BsonDocument> pipeline = Collections.singletonList(matchStage);
AggregateOperation<BsonDocument> aggregateOperation = new AggregateOperation(mongoNamespace, pipeline, decoder);
Object[] arguments = {aggregateOperation};
Class[] argumentTypes = {aggregateOperation.getClass()};

interceptor.beforeMethod(enhancedInstance, getExecuteMethod(), arguments, argumentTypes, null);
interceptor.afterMethod(enhancedInstance, getExecuteMethod(), arguments, argumentTypes, null);

MatcherAssert.assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertMongoAggregateOperationSpan(spans.get(0));
}

@Test
public void testInterceptWithException() throws Throwable {
interceptor.beforeMethod(enhancedInstance, getExecuteMethod(), arguments, argumentTypes, null);
Expand All @@ -128,8 +150,22 @@ private void assertRedisSpan(AbstractTracingSpan span) {
assertThat(span.getOperationName(), is("MongoDB/FindOperation"));
assertThat(SpanHelper.getComponentId(span), is(42));
List<TagValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(1).getValue(), is("{\"name\": \"by\"}"));
assertThat(tags.get(0).getValue(), is("MongoDB"));
assertThat(tags.get(1).getValue(), is("test"));
assertThat(tags.get(2).getValue(), is("user"));
assertThat(tags.get(3).getValue(), is("{\"name\": \"by\"}"));
assertThat(span.isExit(), is(true));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
}

private void assertMongoAggregateOperationSpan(AbstractTracingSpan span) {
assertThat(span.getOperationName(), is("MongoDB/AggregateOperation"));
assertThat(SpanHelper.getComponentId(span), is(42));
List<TagValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("MongoDB"));
assertThat(tags.get(1).getValue(), is("test"));
assertThat(tags.get(2).getValue(), is("user"));
assertThat(tags.get(3).getValue(), is("{\"$match\": {\"name\": \"by\"}},"));
assertThat(span.isExit(), is(true));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.skywalking.apm.plugin.mongodb.v3.interceptor.v37;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.lang.reflect.Method;
import java.util.List;
import com.mongodb.MongoNamespace;
import com.mongodb.ReadConcern;
import com.mongodb.client.internal.OperationExecutor;
import com.mongodb.operation.AggregateOperation;
import com.mongodb.operation.CreateCollectionOperation;
import com.mongodb.operation.FindOperation;
import com.mongodb.operation.WriteOperation;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.LogDataEntity;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
Expand All @@ -50,11 +51,15 @@
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import com.mongodb.MongoNamespace;
import com.mongodb.ReadConcern;
import com.mongodb.client.internal.OperationExecutor;
import com.mongodb.operation.FindOperation;
import com.mongodb.operation.WriteOperation;

import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(TracingSegmentRunner.class)
public class MongoDBOperationExecutorInterceptorTest {
Expand All @@ -75,6 +80,8 @@ public class MongoDBOperationExecutorInterceptorTest {
private Object[] arguments;

private Class[] argumentTypes;
private Decoder decoder;
private MongoNamespace mongoNamespace;

@Before
public void setUp() {
Expand All @@ -87,11 +94,10 @@ public void setUp() {

BsonDocument document = new BsonDocument();
document.append("name", new BsonString("by"));
MongoNamespace mongoNamespace = new MongoNamespace("test.user");
Decoder decoder = mock(Decoder.class);
mongoNamespace = new MongoNamespace("test.user");
decoder = mock(Decoder.class);
FindOperation findOperation = new FindOperation(mongoNamespace, decoder);
findOperation.filter(document);

arguments = new Object[] {findOperation};
argumentTypes = new Class[] {findOperation.getClass()};
}
Expand All @@ -104,11 +110,42 @@ public void testIntercept() throws Throwable {
MatcherAssert.assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertRedisSpan(spans.get(0));
assertMongoFindOperationSpan(spans.get(0));
}

@Test
public void testInterceptWithException() throws Throwable {
public void testCreateCollectionOperationIntercept() throws Throwable {
CreateCollectionOperation createCollectionOperation = new CreateCollectionOperation("test", "user");
Object[] arguments = {createCollectionOperation};
Class[] argumentTypes = {createCollectionOperation.getClass()};
interceptor.beforeMethod(enhancedInstance, getMethod(), arguments, argumentTypes, null);
interceptor.afterMethod(enhancedInstance, getMethod(), arguments, argumentTypes, null);

MatcherAssert.assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertMongoCreateCollectionOperationSpan(spans.get(0));
}

@Test
public void testAggregateOperationIntercept() throws Throwable {
MongoNamespace mongoNamespace = new MongoNamespace("test.user");
BsonDocument matchStage = new BsonDocument("$match", new BsonDocument("name", new BsonString("by")));
List<BsonDocument> pipeline = Collections.singletonList(matchStage);
AggregateOperation<BsonDocument> aggregateOperation = new AggregateOperation(mongoNamespace, pipeline, decoder);
Object[] arguments = {aggregateOperation};
Class[] argumentTypes = {aggregateOperation.getClass()};
interceptor.beforeMethod(enhancedInstance, getMethod(), arguments, argumentTypes, null);
interceptor.afterMethod(enhancedInstance, getMethod(), arguments, argumentTypes, null);

MatcherAssert.assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertMongoAggregateOperationSpan(spans.get(0));
}

@Test
public void testInterceptFindOperationWithException() throws Throwable {
interceptor.beforeMethod(enhancedInstance, getMethod(), arguments, argumentTypes, null);
interceptor.handleMethodException(
enhancedInstance, getMethod(), arguments, argumentTypes, new RuntimeException());
Expand All @@ -117,18 +154,44 @@ public void testInterceptWithException() throws Throwable {
MatcherAssert.assertThat(segmentStorage.getTraceSegments().size(), is(1));
TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(traceSegment);
assertRedisSpan(spans.get(0));
assertMongoFindOperationSpan(spans.get(0));
List<LogDataEntity> logDataEntities = SpanHelper.getLogs(spans.get(0));
assertThat(logDataEntities.size(), is(1));
SpanAssert.assertException(logDataEntities.get(0), RuntimeException.class);
}

private void assertRedisSpan(AbstractTracingSpan span) {
private void assertMongoCreateCollectionOperationSpan(AbstractTracingSpan span) {
assertThat(span.getOperationName(), is("MongoDB/CreateCollectionOperation"));
assertThat(SpanHelper.getComponentId(span), is(42));
List<TagValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("MongoDB"));
assertThat(tags.get(1).getValue(), is("test"));
assertThat(tags.get(2).getValue(), is("user"));
assertThat(tags.get(3).getValue(), is("user"));
assertThat(span.isExit(), is(true));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
}

private void assertMongoAggregateOperationSpan(AbstractTracingSpan span) {
assertThat(span.getOperationName(), is("MongoDB/AggregateOperation"));
assertThat(SpanHelper.getComponentId(span), is(42));
List<TagValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(0).getValue(), is("MongoDB"));
assertThat(tags.get(1).getValue(), is("test"));
assertThat(tags.get(2).getValue(), is("user"));
assertThat(tags.get(3).getValue(), is("{\"$match\": {\"name\": \"by\"}},"));
assertThat(span.isExit(), is(true));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
}

private void assertMongoFindOperationSpan(AbstractTracingSpan span) {
assertThat(span.getOperationName(), is("MongoDB/FindOperation"));
assertThat(SpanHelper.getComponentId(span), is(42));
List<TagValuePair> tags = SpanHelper.getTags(span);
assertThat(tags.get(1).getValue(), is("{\"name\": \"by\"}"));
assertThat(tags.get(0).getValue(), is("MongoDB"));
assertThat(tags.get(1).getValue(), is("test"));
assertThat(tags.get(2).getValue(), is("user"));
assertThat(tags.get(3).getValue(), is("{\"name\": \"by\"}"));
assertThat(span.isExit(), is(true));
assertThat(SpanHelper.getLayer(span), CoreMatchers.is(SpanLayer.DB));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public class OperationNamespaceConstructInterceptor implements InstanceConstruct
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
MongoNamespace mongoNamespace = (MongoNamespace) allArguments[0];
String databaseName = mongoNamespace.getDatabaseName();
objInst.setSkyWalkingDynamicField(databaseName);
objInst.setSkyWalkingDynamicField(mongoNamespace);
}

}
Loading