diff --git a/README.md b/README.md index 13113123..ba8d88e1 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,12 @@ and if you also require basic auth for your schema registry connection you shoul --schemaregistry.auth=username:password ``` +or, if you are using the AWS Glue Schema Registry +``` +--schemaregistry.glue.region=us-east-1 --schemaregistry.glue.registryName=demo-registry +``` + + Finally, a default message and key format (e.g. to deserialize Avro messages or keys) can optionally be configured as follows: ``` --message.format=AVRO diff --git a/pom.xml b/pom.xml index cbe8dd92..3ce2ca53 100644 --- a/pom.xml +++ b/pom.xml @@ -120,6 +120,13 @@ 0.9.3 + + + software.amazon.glue + schema-registry-serde + 1.1.15 + + org.springframework.boot diff --git a/src/main/java/kafdrop/config/GlueSchemaRegistryConfiguration.java b/src/main/java/kafdrop/config/GlueSchemaRegistryConfiguration.java new file mode 100644 index 00000000..4ad9ba8d --- /dev/null +++ b/src/main/java/kafdrop/config/GlueSchemaRegistryConfiguration.java @@ -0,0 +1,47 @@ +package kafdrop.config; + +import org.apache.commons.lang3.StringUtils; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.stereotype.Component; + +@Configuration +public class GlueSchemaRegistryConfiguration { + @Component + @ConfigurationProperties(prefix = "schemaregistry.glue") + public static final class GlueSchemaRegistryProperties { + private String registryName; + private String region; + + private String awsEndpoint; + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + public String getRegistryName() { + return registryName; + } + + public void setRegistryName(String registryName) { + this.registryName = registryName; + } + + public String getAwsEndpoint() { + return awsEndpoint; + } + + public void setAwsEndpoint(String awsEndpoint) { + this.awsEndpoint = awsEndpoint; + } + + public boolean isConfigured() { + return (StringUtils.isNotEmpty(region) && StringUtils.isNotEmpty(registryName)); + + } + } +} diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index b25a7b12..9eba08f3 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -28,6 +28,7 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import kafdrop.config.GlueSchemaRegistryConfiguration; import kafdrop.util.*; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; @@ -48,6 +49,7 @@ import kafdrop.config.MessageFormatConfiguration.MessageFormatProperties; import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties; import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties; +import kafdrop.config.GlueSchemaRegistryConfiguration.GlueSchemaRegistryProperties; import kafdrop.model.MessageVO; import kafdrop.model.TopicPartitionVO; import kafdrop.model.TopicVO; @@ -65,14 +67,17 @@ public final class MessageController { private final SchemaRegistryProperties schemaRegistryProperties; + private final GlueSchemaRegistryProperties glueSchemaRegistryProperties; + private final ProtobufDescriptorProperties protobufProperties; - public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) { + public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties, GlueSchemaRegistryProperties glueSchemaRegistryProperties) { this.kafkaMonitor = kafkaMonitor; this.messageInspector = messageInspector; this.messageFormatProperties = messageFormatProperties; this.schemaRegistryProperties = schemaRegistryProperties; - this.protobufProperties = protobufProperties; + this.glueSchemaRegistryProperties = glueSchemaRegistryProperties; + this.protobufProperties = protobufProperties; } /** @@ -259,10 +264,7 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form final MessageDeserializer deserializer; if (format == MessageFormat.AVRO) { - final var schemaRegistryUrl = schemaRegistryProperties.getConnect(); - final var schemaRegistryAuth = schemaRegistryProperties.getAuth(); - - deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth); + deserializer = new AvroMessageDeserializer(topicName, schemaRegistryProperties, glueSchemaRegistryProperties); } else if (format == MessageFormat.PROTOBUF && null != descFile) { // filter the input file name diff --git a/src/main/java/kafdrop/util/AvroMessageDeserializer.java b/src/main/java/kafdrop/util/AvroMessageDeserializer.java index b01ea072..f2c89b15 100644 --- a/src/main/java/kafdrop/util/AvroMessageDeserializer.java +++ b/src/main/java/kafdrop/util/AvroMessageDeserializer.java @@ -1,6 +1,14 @@ package kafdrop.util; +import com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; import io.confluent.kafka.serializers.*; +import kafdrop.config.GlueSchemaRegistryConfiguration; +import kafdrop.config.SchemaRegistryConfiguration; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.serialization.Deserializer; +import software.amazon.awssdk.services.glue.model.DataFormat; import java.nio.*; import java.util.*; @@ -8,21 +16,44 @@ public final class AvroMessageDeserializer implements MessageDeserializer { private final String topicName; - private final KafkaAvroDeserializer deserializer; - public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) { + private final Deserializer avroDeserializer; + + public AvroMessageDeserializer(String topicName, + SchemaRegistryConfiguration.SchemaRegistryProperties schemaRegistryProperties, + GlueSchemaRegistryConfiguration.GlueSchemaRegistryProperties glueSchemaRegistryProperties) { this.topicName = topicName; - this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth); + + if(glueSchemaRegistryProperties.isConfigured()){ + this.avroDeserializer = getAWSDeserializer(glueSchemaRegistryProperties.getRegion(), + glueSchemaRegistryProperties.getRegistryName(), + glueSchemaRegistryProperties.getAwsEndpoint()); + } + else{ + this.avroDeserializer = getDeserializer(schemaRegistryProperties.getConnect(), schemaRegistryProperties.getAuth()); + } } @Override public String deserializeMessage(ByteBuffer buffer) { // Convert byte buffer to byte array final var bytes = ByteUtils.convertToByteArray(buffer); - return deserializer.deserialize(topicName, bytes).toString(); + return avroDeserializer.deserialize(topicName, bytes).toString(); + } + + private static Deserializer getAWSDeserializer(String region, String registryName, String awsEndpoint) { + final var config = new HashMap(); + config.put(AWSSchemaRegistryConstants.AWS_REGION, region); + config.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName); + config.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName()); + config.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.AVRO.name()); + if(StringUtils.isNotEmpty(awsEndpoint)) + config.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, awsEndpoint); + final var awsKafkaAvroDeserializer = new AWSKafkaAvroDeserializer(config); + return awsKafkaAvroDeserializer; } - private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) { + private static Deserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) { final var config = new HashMap(); config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); if (schemaRegistryAuth != null) {