From 24cfeb8531ae44228071ab94a1145a6e43eb05d7 Mon Sep 17 00:00:00 2001 From: Gordan Grasarevic Date: Wed, 13 Jul 2022 10:53:47 +0100 Subject: [PATCH] feat: add support for providing SASL configuration --- kafka/config.go | 27 +++++++++++++++++++++++++++ kafka/consumer.go | 10 ++++++++++ kafka/producer.go | 10 ++++++++++ 3 files changed, 47 insertions(+) create mode 100644 kafka/config.go diff --git a/kafka/config.go b/kafka/config.go new file mode 100644 index 0000000..b869ea3 --- /dev/null +++ b/kafka/config.go @@ -0,0 +1,27 @@ +package kafka + +import "github.com/Shopify/sarama" + +type SASLMechanism uint8 + +const ( + SASLMechanismPlain SASLMechanism = iota +) + +func (m SASLMechanism) sarama() sarama.SASLMechanism { + switch m { + default: + fallthrough + case SASLMechanismPlain: + return sarama.SASLTypePlaintext + } +} + +// SASLConfig provides a mechanism to authenticate with a Kafka cluster +// using SASL. +type SASLConfig struct { + Mechanism SASLMechanism + Username string + Password string + Version int16 +} diff --git a/kafka/consumer.go b/kafka/consumer.go index fd1116d..0a8a2a8 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -33,6 +33,7 @@ type AsyncMessageSourceConfig struct { OffsetsRetention time.Duration SessionTimeout time.Duration Version string + SASL *SASLConfig Debug bool } @@ -58,6 +59,15 @@ func (ams *AsyncMessageSourceConfig) buildSaramaConsumerConfig() (*sarama.Config config.Consumer.Group.Session.Timeout = st config.Consumer.Offsets.Retention = ams.OffsetsRetention + if ams.SASL != nil { + config.Net.SASL.Enable = true + config.Net.TLS.Enable = true + config.Net.SASL.Mechanism = ams.SASL.Mechanism.sarama() + config.Net.SASL.User = ams.SASL.Username + config.Net.SASL.Password = ams.SASL.Password + config.Net.SASL.Version = ams.SASL.Version + } + if ams.Version != "" { version, err := sarama.ParseKafkaVersion(ams.Version) if err != nil { diff --git a/kafka/producer.go b/kafka/producer.go index 817234a..4fe1b02 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -23,6 +23,7 @@ type AsyncMessageSinkConfig struct { MaxMessageBytes int KeyFunc func(substrate.Message) []byte Version string + SASL *SASLConfig Debug bool } @@ -154,6 +155,15 @@ func (ams *AsyncMessageSinkConfig) buildSaramaProducerConfig() (*sarama.Config, conf.Producer.Retry.Max = 3 conf.Producer.Timeout = time.Duration(60) * time.Second + if ams.SASL != nil { + conf.Net.SASL.Enable = true + conf.Net.TLS.Enable = true + conf.Net.SASL.Mechanism = ams.SASL.Mechanism.sarama() + conf.Net.SASL.User = ams.SASL.Username + conf.Net.SASL.Password = ams.SASL.Password + conf.Net.SASL.Version = ams.SASL.Version + } + if ams.MaxMessageBytes != 0 { if ams.MaxMessageBytes > int(sarama.MaxRequestSize) { sarama.MaxRequestSize = int32(ams.MaxMessageBytes)