Skip to content

Commit 63d81b8

Browse files
Indy-ready - kafka-connect (#15134)
Co-authored-by: SylvainJuge <[email protected]>
1 parent 9d33ce3 commit 63d81b8

File tree

2 files changed

+43
-22
lines changed

2 files changed

+43
-22
lines changed

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/KafkaConnectInstrumentationModule.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
import com.google.auto.service.AutoService;
1212
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
1313
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import io.opentelemetry.javaagent.extension.instrumentation.internal.ExperimentalInstrumentationModule;
1415
import java.util.List;
1516
import net.bytebuddy.matcher.ElementMatcher;
1617

1718
@AutoService(InstrumentationModule.class)
18-
public class KafkaConnectInstrumentationModule extends InstrumentationModule {
19+
public class KafkaConnectInstrumentationModule extends InstrumentationModule
20+
implements ExperimentalInstrumentationModule {
1921

2022
public KafkaConnectInstrumentationModule() {
2123
super("kafka-connect", "kafka-connect-2.6");
@@ -31,4 +33,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
3133
// class added in 2.6.0
3234
return hasClassesNamed("org.apache.kafka.connect.sink.SinkConnectorContext");
3335
}
36+
37+
@Override
38+
public boolean isIndyReady() {
39+
return true;
40+
}
3441
}

instrumentation/kafka/kafka-connect-2.6/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaconnect/v2_6/SinkTaskInstrumentation.java

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313

1414
import io.opentelemetry.context.Context;
1515
import io.opentelemetry.context.Scope;
16-
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
1716
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1817
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
1918
import java.util.Collection;
19+
import javax.annotation.Nullable;
2020
import net.bytebuddy.asm.Advice;
2121
import net.bytebuddy.description.type.TypeDescription;
2222
import net.bytebuddy.matcher.ElementMatcher;
@@ -39,36 +39,50 @@ public void transform(TypeTransformer transformer) {
3939
@SuppressWarnings("unused")
4040
public static class SinkTaskPutAdvice {
4141

42-
@Advice.OnMethodEnter(suppress = Throwable.class)
43-
public static void onEnter(
44-
@Advice.Argument(0) Collection<SinkRecord> records,
45-
@Advice.Local("otelTask") KafkaConnectTask task,
46-
@Advice.Local("otelContext") Context context,
47-
@Advice.Local("otelScope") Scope scope) {
42+
public static class AdviceScope {
43+
private final KafkaConnectTask task;
44+
private final Context context;
45+
private final Scope scope;
46+
47+
private AdviceScope(KafkaConnectTask task, Context context, Scope scope) {
48+
this.task = task;
49+
this.context = context;
50+
this.scope = scope;
51+
}
52+
53+
@Nullable
54+
public static AdviceScope start(Collection<SinkRecord> records) {
55+
Context parentContext = Context.current();
4856

49-
Context parentContext = Java8BytecodeBridge.currentContext();
57+
KafkaConnectTask task = new KafkaConnectTask(records);
58+
if (!instrumenter().shouldStart(parentContext, task)) {
59+
return null;
60+
}
5061

51-
task = new KafkaConnectTask(records);
52-
if (!instrumenter().shouldStart(parentContext, task)) {
53-
return;
62+
Context context = instrumenter().start(parentContext, task);
63+
return new AdviceScope(task, context, context.makeCurrent());
5464
}
5565

56-
context = instrumenter().start(parentContext, task);
57-
scope = context.makeCurrent();
66+
public void end(@Nullable Throwable throwable) {
67+
scope.close();
68+
instrumenter().end(context, task, null, throwable);
69+
}
70+
}
71+
72+
@Nullable
73+
@Advice.OnMethodEnter(suppress = Throwable.class)
74+
public static AdviceScope onEnter(@Advice.Argument(0) Collection<SinkRecord> records) {
75+
return AdviceScope.start(records);
5876
}
5977

6078
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
6179
public static void onExit(
62-
@Advice.Thrown Throwable throwable,
63-
@Advice.Local("otelTask") KafkaConnectTask task,
64-
@Advice.Local("otelContext") Context context,
65-
@Advice.Local("otelScope") Scope scope) {
80+
@Advice.Thrown @Nullable Throwable throwable,
81+
@Advice.Enter @Nullable AdviceScope adviceScope) {
6682

67-
if (scope == null) {
68-
return;
83+
if (adviceScope != null) {
84+
adviceScope.end(throwable);
6985
}
70-
scope.close();
71-
instrumenter().end(context, task, null, throwable);
7286
}
7387
}
7488
}

0 commit comments

Comments
 (0)