Skip to content

Commit a05e5f6

Browse files
author
Yevgeniy Magdel
committed
[allow_configuring_avro_consumer] Debugging / fixes
1 parent 36c89df commit a05e5f6

File tree

3 files changed

+32
-9
lines changed

3 files changed

+32
-9
lines changed

src/main/java/kafdrop/config/MessageFormatConfiguration.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,26 @@ public void setFormat(MessageFormat format) {
3131
this.format = format;
3232
}
3333
}
34+
35+
@Component
36+
@ConfigurationProperties(prefix = "key")
37+
public static final class KeyFormatProperties {
38+
private MessageFormat format;
39+
40+
@PostConstruct
41+
public void init() {
42+
// Set a default message format if not configured.
43+
if (format == null) {
44+
format = MessageFormat.DEFAULT;
45+
}
46+
}
47+
48+
public MessageFormat getFormat() {
49+
return format;
50+
}
51+
52+
public void setFormat(MessageFormat format) {
53+
this.format = format;
54+
}
55+
}
3456
}

src/main/java/kafdrop/controller/MessageController.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import javax.validation.constraints.Min;
2929
import javax.validation.constraints.NotNull;
3030

31+
import kafdrop.config.MessageFormatConfiguration;
3132
import kafdrop.util.*;
3233
import org.springframework.http.MediaType;
3334
import org.springframework.stereotype.Controller;
@@ -64,19 +65,19 @@ public final class MessageController {
6465
private final MessageInspector messageInspector;
6566

6667
private final MessageFormatProperties messageFormatProperties;
67-
private final MessageFormatProperties keyFormatProperties;
68+
private final MessageFormatConfiguration.KeyFormatProperties keyFormatProperties;
6869

6970
private final SchemaRegistryProperties schemaRegistryProperties;
7071

7172
private final ProtobufDescriptorProperties protobufProperties;
7273

73-
public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, MessageFormatProperties keyFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) {
74+
public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, MessageFormatConfiguration.KeyFormatProperties keyFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) {
7475
this.kafkaMonitor = kafkaMonitor;
7576
this.messageInspector = messageInspector;
7677
this.messageFormatProperties = messageFormatProperties;
7778
this.keyFormatProperties = keyFormatProperties;
7879
this.schemaRegistryProperties = schemaRegistryProperties;
79-
this.protobufProperties = protobufProperties;
80+
this.protobufProperties = protobufProperties;
8081
}
8182

8283
/**
@@ -314,9 +315,9 @@ public static class PartitionOffsetInfo {
314315
private MessageFormat format;
315316

316317
private MessageFormat keyFormat;
317-
318+
318319
private String descFile;
319-
320+
320321
private String msgTypeName;
321322

322323
public PartitionOffsetInfo(int partition, long offset, long count, MessageFormat format) {

src/main/java/kafdrop/util/AvroMessageDeserializer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,18 @@ private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, S
3737
}
3838

3939
private static void setConfigFromEnvIfAvailable(String topicName, String configPath, Map<String,Object> config){
40-
4140
String configPrefix = "SCHEMA_REGISTRY";
42-
String topicScopedEnvPath = Arrays.stream(new String[]{configPrefix, configPath.replace(".", "_"), topicName.replace("-", "_") } )
41+
String topicScopedEnvPath = Arrays.stream(new String[]{configPrefix, configPath.replaceAll("\\.", "_"), topicName.replaceAll("-", "_") } )
4342
.map(String::toUpperCase).collect(Collectors.joining("_"));
4443

45-
String noTopicScopedEnvPath = Arrays.stream(new String[]{ "SCHEMA_REGISTRY", configPath.replace(".", "_") })
44+
String noTopicScopedEnvPath = Arrays.stream(new String[]{ configPrefix, configPath.replaceAll("\\.", "_") })
4645
.map(String::toUpperCase).collect(Collectors.joining("_"));
4746

4847
for(String envPath : new String[]{topicScopedEnvPath, noTopicScopedEnvPath}) {
48+
4949
String namingStrategyValue = System.getenv(envPath);
5050
if (namingStrategyValue != null) {
51-
config.put(envPath, namingStrategyValue);
51+
config.put(configPath, namingStrategyValue);
5252
}
5353
}
5454
}

0 commit comments

Comments
 (0)