diff --git a/README.md b/README.md
index 13113123..ba8d88e1 100644
--- a/README.md
+++ b/README.md
@@ -60,6 +60,12 @@ and if you also require basic auth for your schema registry connection you shoul
--schemaregistry.auth=username:password
```
+or, if you are using the AWS Glue Schema Registry
+```
+--schemaregistry.glue.region=us-east-1 --schemaregistry.glue.registryName=demo-registry
+```
+
+
Finally, a default message and key format (e.g. to deserialize Avro messages or keys) can optionally be configured as follows:
```
--message.format=AVRO
diff --git a/pom.xml b/pom.xml
index cbe8dd92..3ce2ca53 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,6 +120,13 @@
0.9.3
+
+
+ software.amazon.glue
+ schema-registry-serde
+ 1.1.15
+
+
org.springframework.boot
diff --git a/src/main/java/kafdrop/config/GlueSchemaRegistryConfiguration.java b/src/main/java/kafdrop/config/GlueSchemaRegistryConfiguration.java
new file mode 100644
index 00000000..4ad9ba8d
--- /dev/null
+++ b/src/main/java/kafdrop/config/GlueSchemaRegistryConfiguration.java
@@ -0,0 +1,47 @@
+package kafdrop.config;
+
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.stereotype.Component;
+
+@Configuration
+public class GlueSchemaRegistryConfiguration {
+ @Component
+ @ConfigurationProperties(prefix = "schemaregistry.glue")
+ public static final class GlueSchemaRegistryProperties {
+ private String registryName;
+ private String region;
+
+ private String awsEndpoint;
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ public String getRegistryName() {
+ return registryName;
+ }
+
+ public void setRegistryName(String registryName) {
+ this.registryName = registryName;
+ }
+
+ public String getAwsEndpoint() {
+ return awsEndpoint;
+ }
+
+ public void setAwsEndpoint(String awsEndpoint) {
+ this.awsEndpoint = awsEndpoint;
+ }
+
+ public boolean isConfigured() {
+ return (StringUtils.isNotEmpty(region) && StringUtils.isNotEmpty(registryName));
+
+ }
+ }
+}
diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java
index b25a7b12..9eba08f3 100644
--- a/src/main/java/kafdrop/controller/MessageController.java
+++ b/src/main/java/kafdrop/controller/MessageController.java
@@ -28,6 +28,7 @@
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
+import kafdrop.config.GlueSchemaRegistryConfiguration;
import kafdrop.util.*;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
@@ -48,6 +49,7 @@
import kafdrop.config.MessageFormatConfiguration.MessageFormatProperties;
import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties;
import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties;
+import kafdrop.config.GlueSchemaRegistryConfiguration.GlueSchemaRegistryProperties;
import kafdrop.model.MessageVO;
import kafdrop.model.TopicPartitionVO;
import kafdrop.model.TopicVO;
@@ -65,14 +67,17 @@ public final class MessageController {
private final SchemaRegistryProperties schemaRegistryProperties;
+ private final GlueSchemaRegistryProperties glueSchemaRegistryProperties;
+
private final ProtobufDescriptorProperties protobufProperties;
- public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) {
+ public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties, GlueSchemaRegistryProperties glueSchemaRegistryProperties) {
this.kafkaMonitor = kafkaMonitor;
this.messageInspector = messageInspector;
this.messageFormatProperties = messageFormatProperties;
this.schemaRegistryProperties = schemaRegistryProperties;
- this.protobufProperties = protobufProperties;
+ this.glueSchemaRegistryProperties = glueSchemaRegistryProperties;
+ this.protobufProperties = protobufProperties;
}
/**
@@ -259,10 +264,7 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form
final MessageDeserializer deserializer;
if (format == MessageFormat.AVRO) {
- final var schemaRegistryUrl = schemaRegistryProperties.getConnect();
- final var schemaRegistryAuth = schemaRegistryProperties.getAuth();
-
- deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth);
+ deserializer = new AvroMessageDeserializer(topicName, schemaRegistryProperties, glueSchemaRegistryProperties);
} else if (format == MessageFormat.PROTOBUF && null != descFile) {
// filter the input file name
diff --git a/src/main/java/kafdrop/util/AvroMessageDeserializer.java b/src/main/java/kafdrop/util/AvroMessageDeserializer.java
index b01ea072..f2c89b15 100644
--- a/src/main/java/kafdrop/util/AvroMessageDeserializer.java
+++ b/src/main/java/kafdrop/util/AvroMessageDeserializer.java
@@ -1,6 +1,14 @@
package kafdrop.util;
+import com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer;
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import io.confluent.kafka.serializers.*;
+import kafdrop.config.GlueSchemaRegistryConfiguration;
+import kafdrop.config.SchemaRegistryConfiguration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.common.serialization.Deserializer;
+import software.amazon.awssdk.services.glue.model.DataFormat;
import java.nio.*;
import java.util.*;
@@ -8,21 +16,44 @@
public final class AvroMessageDeserializer implements MessageDeserializer {
private final String topicName;
- private final KafkaAvroDeserializer deserializer;
- public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) {
+ private final Deserializer avroDeserializer;
+
+ public AvroMessageDeserializer(String topicName,
+ SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties,
+ GlueSchemaRegistryConfiguration.GlueSchemaRegistryProperties glueSchemaRegistryProperties) {
this.topicName = topicName;
- this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth);
+
+ if(glueSchemaRegistryProperties.isConfigured()){
+ this.avroDeserializer = getAWSDeserializer(glueSchemaRegistryProperties.getRegion(),
+ glueSchemaRegistryProperties.getRegistryName(),
+ glueSchemaRegistryProperties.getAwsEndpoint());
+ }
+ else{
+ this.avroDeserializer = getDeserializer(schemaRegistryProperties.getConnect(), schemaRegistryProperties.getAuth());
+ }
}
@Override
public String deserializeMessage(ByteBuffer buffer) {
// Convert byte buffer to byte array
final var bytes = ByteUtils.convertToByteArray(buffer);
- return deserializer.deserialize(topicName, bytes).toString();
+ return avroDeserializer.deserialize(topicName, bytes).toString();
+ }
+
+ private static Deserializer getAWSDeserializer(String region, String registryName, String awsEndpoint) {
+ final var config = new HashMap();
+ config.put(AWSSchemaRegistryConstants.AWS_REGION, region);
+ config.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName);
+ config.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
+ config.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name());
+ if(StringUtils.isNotEmpty(awsEndpoint))
+ config.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, awsEndpoint);
+ final var awsKafkaAvroDeserializer = new AWSKafkaAvroDeserializer(config);
+ return awsKafkaAvroDeserializer;
}
- private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) {
+ private static Deserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) {
final var config = new HashMap();
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
if (schemaRegistryAuth != null) {