Skip to content

Commit c04a714

Browse files
authored
[Feature] Support to configure custom decode method for kafka configurations (#540)
1 parent ddc68e2 commit c04a714

File tree

11 files changed

+179
-5
lines changed

11 files changed

+179
-5
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Release Notes.
1919
* Fix possible IllegalStateException when using Micrometer.
2020
* Support Grizzly Work ThreadPool Metric Monitor
2121
* Fix the gson dependency in the kafka-reporter-plugin.
22+
* Support to config custom decode methods for kafka configurations
2223

2324
#### Documentation
2425

apm-sniffer/config/agent.config

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,8 @@ plugin.kafka.topic_management=${SW_PLUGIN_KAFKA_TOPIC_MANAGEMENT:skywalking-mana
263263
plugin.kafka.topic_logging=${SW_PLUGIN_KAFKA_TOPIC_LOGGING:skywalking-logs}
264264
# isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ).
265265
plugin.kafka.namespace=${SW_KAFKA_NAMESPACE:}
266+
# Specify which class to decode encoded configuration of kafka.You can set encoded information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need.
267+
plugin.kafka.decode_class=${SW_KAFKA_DECODE_CLASS:}
266268
# Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated.
267269
plugin.springannotation.classname_match_regex=${SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX:}
268270
# Whether or not to transmit logged data as formatted or un-formatted.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one or more
3+
~ contributor license agreements. See the NOTICE file distributed with
4+
~ this work for additional information regarding copyright ownership.
5+
~ The ASF licenses this file to You under the Apache License, Version 2.0
6+
~ (the "License"); you may not use this file except in compliance with
7+
~ the License. You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
~
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<parent>
22+
<artifactId>optional-reporter-plugins</artifactId>
23+
<groupId>org.apache.skywalking</groupId>
24+
<version>8.16.0-SNAPSHOT</version>
25+
</parent>
26+
<modelVersion>4.0.0</modelVersion>
27+
28+
<artifactId>kafka-config-extension</artifactId>
29+
<packaging>jar</packaging>
30+
31+
<url>http://maven.apache.org</url>
32+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.agent.core.kafka;
20+
21+
import java.util.Map;
22+
23+
public interface KafkaConfigExtension {
24+
Map<String, String> decode(Map<String, String> config);
25+
}

apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434
<artifactId>kafka-clients</artifactId>
3535
<version>${kafka-clients.version}</version>
3636
</dependency>
37+
<dependency>
38+
<groupId>org.apache.skywalking</groupId>
39+
<artifactId>kafka-config-extension</artifactId>
40+
<version>${project.version}</version>
41+
</dependency>
3742
</dependencies>
3843

3944

apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.skywalking.apm.agent.core.kafka;
2020

2121
import com.google.gson.Gson;
22+
23+
import java.lang.reflect.InvocationTargetException;
2224
import java.util.ArrayList;
2325
import java.util.HashSet;
2426
import java.util.List;
@@ -108,9 +110,9 @@ public void run() {
108110
if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
109111
Gson gson = new Gson();
110112
Map<String, String> config = (Map<String, String>) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
111-
config.forEach(properties::setProperty);
113+
decode(config).forEach(properties::setProperty);
112114
}
113-
Kafka.PRODUCER_CONFIG.forEach(properties::setProperty);
115+
decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);
114116

115117
try (AdminClient adminClient = AdminClient.create(properties)) {
116118
DescribeTopicsResult topicsResult = adminClient.describeTopics(topics);
@@ -153,6 +155,22 @@ private void notifyListeners(KafkaConnectionStatus status) {
153155
}
154156
}
155157

158+
private Map<String, String> decode(Map<String, String> config) {
159+
if (StringUtil.isBlank(Kafka.DECODE_CLASS)) {
160+
return config;
161+
}
162+
try {
163+
Object decodeTool = Class.forName(Kafka.DECODE_CLASS).getDeclaredConstructor().newInstance();
164+
if (decodeTool instanceof KafkaConfigExtension) {
165+
return ((KafkaConfigExtension) decodeTool).decode(config);
166+
}
167+
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
168+
// ignore
169+
LOGGER.warn("The decode class {} does not exist, exception:{}.", Kafka.DECODE_CLASS, e);
170+
}
171+
return config;
172+
}
173+
156174
/**
157175
* Get the KafkaProducer instance to send data to Kafka broker.
158176
* @return Kafka producer

apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ public static class Kafka {
6363
* Timeout period of reading topics from the Kafka server, the unit is second.
6464
*/
6565
public static int GET_TOPIC_TIMEOUT = 10;
66+
/**
67+
* Class name of decoding encoded information in kafka configuration.
68+
*/
69+
public static String DECODE_CLASS = "";
6670
}
6771
}
6872
}

apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,16 @@
1818

1919
package org.apache.skywalking.apm.agent.core.kafka;
2020

21-
import static org.junit.Assert.assertEquals;
21+
import org.junit.Test;
22+
2223
import java.lang.reflect.Method;
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.Base64;
26+
import java.util.HashMap;
27+
import java.util.Map;
2328
import java.util.concurrent.atomic.AtomicInteger;
24-
import org.junit.Test;
29+
30+
import static org.junit.Assert.assertEquals;
2531

2632
public class KafkaProducerManagerTest {
2733
@Test
@@ -54,6 +60,22 @@ public void testFormatTopicNameThenRegister() {
5460
assertEquals(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS, value);
5561
}
5662

63+
@Test
64+
public void testDecode() throws Exception {
65+
KafkaReporterPluginConfig.Plugin.Kafka.DECODE_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecodeTool";
66+
KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
67+
68+
Map<String, String> config = new HashMap<>();
69+
String value = "test.99998888";
70+
config.put("test.password", Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8)));
71+
72+
Method decodeMethod = kafkaProducerManager.getClass().getDeclaredMethod("decode", Map.class);
73+
decodeMethod.setAccessible(true);
74+
Map<String, String> decodeConfig = (Map<String, String>) decodeMethod.invoke(kafkaProducerManager, config);
75+
76+
assertEquals(value, decodeConfig.get("test.password"));
77+
}
78+
5779
static class MockListener implements KafkaConnectionStatusListener {
5880

5981
private AtomicInteger counter;
@@ -68,4 +90,14 @@ public void onStatusChanged(KafkaConnectionStatus status) {
6890
}
6991
}
7092

93+
static class DecodeTool implements KafkaConfigExtension {
94+
@Override
95+
public Map<String, String> decode(Map<String, String> config) {
96+
if (config.containsKey("test.password")) {
97+
config.put("test.password", new String(Base64.getDecoder().decode(config.get("test.password")), StandardCharsets.UTF_8));
98+
}
99+
return config;
100+
}
101+
}
102+
71103
}

apm-sniffer/optional-reporter-plugins/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
<modules>
3232
<module>kafka-reporter-plugin</module>
33+
<module>kafka-config-extension</module>
3334
</modules>
3435

3536
<properties>

docs/en/setup/service-agent/java-agent/advanced-reporters.md

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,64 @@ plugin.kafka.producer_config_json={"delivery.timeout.ms": 12000, "compression.ty
3333

3434
Currently, there are 2 ways to configure advanced configurations below. Notice that, the new way, configured in JSON format, will be overridden by `plugin.kafka.producer_config[key]=value` when they have the duplication keys.
3535

36+
Since 8.16.0, users could implement their decoder for kafka configurations rather than using plain configurations(such as `password`) of Kafka producer,
37+
Including `plugin.kafka.producer_config_json`,`plugin.kafka.producer_config` or environment variable `SW_PLUGIN_KAFKA_PRODUCER_CONFIG_JSON`.
38+
39+
By doing that, add the `kafka-config-extension` dependency to your decoder project and implement `decode` interface.
40+
41+
- Add the `KafkaConfigExtension` dependency to your project.
42+
```
43+
<dependency>
44+
<groupId>org.apache.skywalking</groupId>
45+
<artifactId>kafka-config-extension</artifactId>
46+
<version>${skywalking.version}</version>
47+
<scope>provided</scope>
48+
</dependency>
49+
```
50+
51+
- Implement your custom decode method.Like this:
52+
```
53+
package org.apache.skywalking.apm.agent.sample;
54+
55+
import org.apache.skywalking.apm.agent.core.kafka.KafkaConfigExtension;
56+
import java.util.Map;
57+
58+
/**
59+
* Custom decode class
60+
*/
61+
public class DecodeUtil implements KafkaConfigExtension {
62+
/**
63+
* Custom decode method.
64+
* @param config the value of `plugin.kafka.producer_config` or `plugin.kafka.producer_config_json` in `agent.config`.
65+
* @return the decoded configuration if you implement your custom decode logic.
66+
*/
67+
public Map<String, String> decode(Map<String, String> config) {
68+
/**
69+
* implement your custom decode logic
70+
* */
71+
return config;
72+
}
73+
}
74+
```
75+
76+
Then, package your decoder project as a jar and move to `agent/plugins`.
77+
78+
**Notice, the jar package should contain all the dependencies required for your custom decode code.**
79+
80+
The last step is to activate the decoder class in `agent.config` like this:
81+
```
82+
plugin.kafka.decrypt_class="org.apache.skywalking.apm.agent.sample.DecodeUtil"
83+
```
84+
or configure by environment variable
85+
```
86+
SW_KAFKA_DECRYPT_CLASS="org.apache.skywalking.apm.agent.sample.DecodeUtil"
87+
```
88+
3689
## 3rd party reporters
3790
There are other reporter implementations from out of the Apache Software Foundation.
3891

3992
### Pulsar Reporter
4093
Go to [Pulsar-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/pulsar/Pulsar-Reporter.md) for more details.
4194

4295
### RocketMQ Reporter
43-
Go to [RocketMQ-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/rocketmq/Rocketmq-Reporter.md) for more details.
96+
Go to [RocketMQ-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/rocketmq/Rocketmq-Reporter.md) for more details.

0 commit comments

Comments
 (0)