Skip to content

Commit 789d039

Browse files
BE: Support PEM trust/key stores and mTLS configuration. (#1437)
1 parent c30851c commit 789d039

File tree

16 files changed

+287
-162
lines changed

16 files changed

+287
-162
lines changed

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,8 @@ public static WebClient buildWebClient(DataSize maxBuffSize,
342342
.configureSsl(
343343
truststoreConfig,
344344
new ClustersProperties.KeystoreConfig(
345+
config.getKeystoreType(),
346+
config.getKeystoreCertificate(),
345347
config.getKeystoreLocation(),
346348
config.getKeystorePassword()
347349
)

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static io.kafbat.ui.model.MetricsScrapeProperties.JMX_METRICS_TYPE;
44

5+
import io.kafbat.ui.api.model.SecurityProtocol;
56
import jakarta.annotation.PostConstruct;
67
import jakarta.validation.Valid;
78
import jakarta.validation.constraints.NotBlank;
@@ -63,7 +64,9 @@ public static class Cluster {
6364
@NotBlank(message = "field bootstrapServers for for cluster could not be blank")
6465
String bootstrapServers;
6566

67+
SecurityProtocol securityProtocol;
6668
TruststoreConfig ssl;
69+
KeystoreConfig kafkaSsl;
6770

6871
String schemaRegistry;
6972
SchemaRegistryAuth schemaRegistryAuth;
@@ -108,6 +111,8 @@ public static class MetricsConfig {
108111
Boolean ssl;
109112
String username;
110113
String password;
114+
StoreType keystoreType;
115+
String keystoreCertificate;
111116
String keystoreLocation;
112117
String keystorePassword;
113118

@@ -143,6 +148,8 @@ public static class ConnectCluster {
143148
String address;
144149
String username;
145150
String password;
151+
StoreType keystoreType;
152+
String keystoreCertificate;
146153
String keystoreLocation;
147154
String keystorePassword;
148155
}
@@ -154,9 +161,14 @@ public static class SchemaRegistryAuth {
154161
String password;
155162
}
156163

164+
public enum StoreType {
165+
JKS, PKCS12, PEM
166+
}
167+
157168
@Data
158169
@ToString(exclude = {"truststorePassword"})
159170
public static class TruststoreConfig {
171+
StoreType truststoreType;
160172
String truststoreLocation;
161173
String truststorePassword;
162174
boolean verifySsl = true;
@@ -167,6 +179,8 @@ public static class TruststoreConfig {
167179
@AllArgsConstructor
168180
@ToString(exclude = {"keystorePassword"})
169181
public static class KeystoreConfig {
182+
StoreType keystoreType;
183+
String keystoreCertificate;
170184
String keystoreLocation;
171185
String keystorePassword;
172186
}

api/src/main/java/io/kafbat/ui/model/MetricsScrapeProperties.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,24 @@ public class MetricsScrapeProperties {
3030

3131
public static MetricsScrapeProperties create(ClustersProperties.Cluster cluster) {
3232
var metrics = Objects.requireNonNull(cluster.getMetrics());
33+
34+
KeystoreConfig keystoreConfig = null;
35+
if (metrics.getKeystoreLocation() != null) {
36+
keystoreConfig = new KeystoreConfig(
37+
metrics.getKeystoreType(),
38+
metrics.getKeystoreCertificate(),
39+
metrics.getKeystoreLocation(),
40+
metrics.getKeystorePassword()
41+
);
42+
}
43+
3344
return MetricsScrapeProperties.builder()
3445
.port(metrics.getPort())
3546
.ssl(Optional.ofNullable(metrics.getSsl()).orElse(false))
3647
.username(metrics.getUsername())
3748
.password(metrics.getPassword())
3849
.truststoreConfig(cluster.getSsl())
39-
.keystoreConfig(
40-
metrics.getKeystoreLocation() != null
41-
? new KeystoreConfig(metrics.getKeystoreLocation(), metrics.getKeystorePassword())
42-
: null
43-
)
50+
.keystoreConfig(keystoreConfig)
4451
.build();
4552
}
4653

api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
4545
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
4646
return Mono.fromSupplier(() -> {
4747
Properties properties = new Properties();
48-
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
48+
KafkaClientSslPropertiesUtil.addKafkaSslProperties(
49+
cluster.getOriginalProperties().getSsl(),
50+
cluster.getOriginalProperties().getKafkaSsl(),
51+
cluster.getOriginalProperties().getSecurityProtocol(),
52+
properties
53+
);
4954
properties.putAll(cluster.getProperties());
5055
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
5156
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,12 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster) {
353353
public EnhancedConsumer createConsumer(KafkaCluster cluster,
354354
Map<String, Object> properties) {
355355
Properties props = new Properties();
356-
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
356+
KafkaClientSslPropertiesUtil.addKafkaSslProperties(
357+
cluster.getOriginalProperties().getSsl(),
358+
cluster.getOriginalProperties().getKafkaSsl(),
359+
cluster.getOriginalProperties().getSecurityProtocol(),
360+
props
361+
);
357362
props.putAll(cluster.getProperties());
358363
props.putAll(cluster.getConsumerProperties());
359364
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafbat-ui-consumer-" + System.currentTimeMillis());

api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import static io.kafbat.ui.util.KafkaServicesValidation.validateKsql;
55
import static io.kafbat.ui.util.KafkaServicesValidation.validatePrometheusStore;
66
import static io.kafbat.ui.util.KafkaServicesValidation.validateSchemaRegistry;
7-
import static io.kafbat.ui.util.KafkaServicesValidation.validateTruststore;
7+
import static io.kafbat.ui.util.KafkaServicesValidation.validateSslBundle;
88

99
import io.kafbat.ui.client.RetryingKafkaConnectClient;
1010
import io.kafbat.ui.config.ClustersProperties;
@@ -105,20 +105,22 @@ public KafkaCluster create(ClustersProperties properties,
105105

106106
public Mono<ClusterConfigValidationDTO> validate(ClustersProperties.Cluster clusterProperties) {
107107
if (clusterProperties.getSsl() != null) {
108-
Optional<String> errMsg = validateTruststore(clusterProperties.getSsl());
108+
Optional<String> errMsg = validateSslBundle(clusterProperties.getSsl(), clusterProperties.getKafkaSsl());
109109
if (errMsg.isPresent()) {
110110
return Mono.just(new ClusterConfigValidationDTO()
111111
.kafka(new ApplicationPropertyValidationDTO()
112112
.error(true)
113-
.errorMessage("Truststore not valid: " + errMsg.get())));
113+
.errorMessage("Truststore/Keystore not valid: " + errMsg.get())));
114114
}
115115
}
116116

117117
return Mono.zip(
118118
validateClusterConnection(
119119
clusterProperties.getBootstrapServers(),
120120
convertProperties(clusterProperties.getProperties()),
121-
clusterProperties.getSsl()
121+
clusterProperties.getSsl(),
122+
clusterProperties.getKafkaSsl(),
123+
clusterProperties.getSecurityProtocol()
122124
),
123125
schemaRegistryConfigured(clusterProperties)
124126
? validateSchemaRegistry(() -> schemaRegistryClient(clusterProperties)).map(Optional::of)

api/src/main/java/io/kafbat/ui/service/MessagesService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,12 @@ public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
204204
public static KafkaProducer<byte[], byte[]> createProducer(ClustersProperties.Cluster cluster,
205205
Map<String, Object> additionalProps) {
206206
Properties properties = new Properties();
207-
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getSsl(), properties);
207+
KafkaClientSslPropertiesUtil.addKafkaSslProperties(
208+
cluster.getSsl(),
209+
cluster.getKafkaSsl(),
210+
cluster.getSecurityProtocol(),
211+
properties
212+
);
208213
properties.putAll(cluster.getProperties());
209214
properties.putAll(cluster.getProducerProperties());
210215
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());

api/src/main/java/io/kafbat/ui/service/metrics/scrape/jmx/JmxMetricsRetriever.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,7 @@ private Map<String, Object> prepareJmxEnvAndSetThreadLocal(MetricsScrapeProperti
105105
if (isSslJmxEndpoint(scrapeProperties)) {
106106
var truststoreConfig = scrapeProperties.getTruststoreConfig();
107107
var keystoreConfig = scrapeProperties.getKeystoreConfig();
108-
JmxSslSocketFactory.setSslContextThreadLocal(
109-
truststoreConfig != null ? truststoreConfig.getTruststoreLocation() : null,
110-
truststoreConfig != null ? truststoreConfig.getTruststorePassword() : null,
111-
keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null,
112-
keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null
113-
);
108+
JmxSslSocketFactory.setSslContextThreadLocal(truststoreConfig, keystoreConfig);
114109
JmxSslSocketFactory.editJmxConnectorEnv(env);
115110
}
116111

@@ -144,4 +139,3 @@ private List<RawMetric> extractObjectMetrics(ObjectName objectName, MBeanServerC
144139
}
145140

146141
}
147-

api/src/main/java/io/kafbat/ui/service/metrics/scrape/jmx/JmxSslSocketFactory.java

Lines changed: 10 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,20 @@
11
package io.kafbat.ui.service.metrics.scrape.jmx;
22

33
import com.google.common.base.Preconditions;
4-
import java.io.FileInputStream;
4+
import io.kafbat.ui.config.ClustersProperties;
5+
import io.kafbat.ui.util.SslBundleUtil;
56
import java.io.IOException;
67
import java.lang.reflect.Field;
78
import java.net.InetAddress;
89
import java.net.Socket;
9-
import java.net.UnknownHostException;
10-
import java.security.KeyStore;
1110
import java.util.Map;
1211
import java.util.concurrent.ConcurrentHashMap;
1312
import javax.annotation.Nullable;
14-
import javax.net.ssl.KeyManagerFactory;
1513
import javax.net.ssl.SSLContext;
16-
import javax.net.ssl.TrustManagerFactory;
1714
import javax.rmi.ssl.SslRMIClientSocketFactory;
1815
import lombok.SneakyThrows;
1916
import lombok.extern.slf4j.Slf4j;
20-
import org.springframework.util.ResourceUtils;
17+
import org.springframework.boot.ssl.SslBundle;
2118

2219
/*
2320
* Purpose of this class to provide an ability to connect to different JMX endpoints using different keystores.
@@ -79,18 +76,13 @@ public static boolean initialized() {
7976
private record HostAndPort(String host, int port) {
8077
}
8178

82-
private record Ssl(@Nullable String truststoreLocation,
83-
@Nullable String truststorePassword,
84-
@Nullable String keystoreLocation,
85-
@Nullable String keystorePassword) {
79+
private record Ssl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
80+
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
8681
}
8782

88-
public static void setSslContextThreadLocal(@Nullable String truststoreLocation,
89-
@Nullable String truststorePassword,
90-
@Nullable String keystoreLocation,
91-
@Nullable String keystorePassword) {
92-
SSL_CONTEXT_THREAD_LOCAL.set(
93-
new Ssl(truststoreLocation, truststorePassword, keystoreLocation, keystorePassword));
83+
public static void setSslContextThreadLocal(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
84+
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
85+
SSL_CONTEXT_THREAD_LOCAL.set(new Ssl(truststoreConfig, keystoreConfig));
9486
}
9587

9688
// should be called when (host:port) -> factory cache should be invalidated (ex. on app config reload)
@@ -118,33 +110,8 @@ public JmxSslSocketFactory() {
118110
@SneakyThrows
119111
private javax.net.ssl.SSLSocketFactory createFactoryFromThreadLocalCtx() {
120112
Ssl ssl = Preconditions.checkNotNull(SSL_CONTEXT_THREAD_LOCAL.get());
121-
122-
var trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
123-
if (ssl.truststoreLocation() != null && ssl.truststorePassword() != null) {
124-
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
125-
trustStore.load(
126-
new FileInputStream((ResourceUtils.getFile(ssl.truststoreLocation()))),
127-
ssl.truststorePassword().toCharArray()
128-
);
129-
trustManagerFactory.init(trustStore);
130-
}
131-
132-
var keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
133-
if (ssl.keystoreLocation() != null && ssl.keystorePassword() != null) {
134-
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
135-
keyStore.load(
136-
new FileInputStream(ResourceUtils.getFile(ssl.keystoreLocation())),
137-
ssl.keystorePassword().toCharArray()
138-
);
139-
keyManagerFactory.init(keyStore, ssl.keystorePassword().toCharArray());
140-
}
141-
142-
SSLContext ctx = SSLContext.getInstance("TLS");
143-
ctx.init(
144-
keyManagerFactory.getKeyManagers(),
145-
trustManagerFactory.getTrustManagers(),
146-
null
147-
);
113+
SslBundle bundle = SslBundleUtil.mustLoadBundle(ssl.truststoreConfig(), ssl.keystoreConfig);
114+
SSLContext ctx = bundle.createSslContext();
148115
return ctx.getSocketFactory();
149116
}
150117

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,37 @@
11
package io.kafbat.ui.util;
22

3+
import io.kafbat.ui.api.model.SecurityProtocol;
34
import io.kafbat.ui.config.ClustersProperties;
45
import java.util.Properties;
56
import javax.annotation.Nullable;
7+
import org.apache.kafka.clients.CommonClientConfigs;
68
import org.apache.kafka.common.config.SslConfigs;
9+
import org.springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory;
10+
import org.springframework.boot.ssl.SslBundle;
711

812
public final class KafkaClientSslPropertiesUtil {
913

1014
private KafkaClientSslPropertiesUtil() {
1115
}
1216

1317
public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
18+
@Nullable ClustersProperties.KeystoreConfig keystoreConfig,
19+
@Nullable SecurityProtocol securityProtocol,
1420
Properties sink) {
15-
if (truststoreConfig == null) {
16-
return;
21+
if (securityProtocol != null) {
22+
sink.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
1723
}
1824

19-
if (!truststoreConfig.isVerifySsl()) {
25+
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {
2026
sink.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
2127
}
2228

23-
if (truststoreConfig.getTruststoreLocation() == null) {
29+
SslBundle bundle = SslBundleUtil.loadBundle(truststoreConfig, keystoreConfig);
30+
if (bundle == null) {
2431
return;
2532
}
2633

27-
sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());
28-
29-
if (truststoreConfig.getTruststorePassword() != null) {
30-
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
31-
}
32-
34+
sink.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, SslBundleSslEngineFactory.class);
35+
sink.put(SslBundle.class.getName(), bundle);
3336
}
3437
}

0 commit comments

Comments
 (0)