Skip to content

Commit 2ba01b5

Browse files
authored
Merge branch 'main' into DINF-2950-schema-prefix-fix
2 parents 6baf000 + 13f545b commit 2ba01b5

File tree

142 files changed

+3260
-716
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

142 files changed

+3260
-716
lines changed

README.md

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,28 +72,25 @@ Kafbat UI wraps major functions of Apache Kafka with an intuitive user interface
7272
![Interface](documentation/images/Interface.gif)
7373

7474
## Topics
75-
Kafbat UI makes it easy for you to create topics in your browser by several clicks,
76-
pasting your own parameters, and viewing topics in the list.
75+
Kafbat UI makes it easy for you to create topics in your browser with just a few clicks, by pasting your own parameters, and viewing topics in the list.
7776

7877
![Create Topic](documentation/images/Create_topic_kafka-ui.gif)
7978

80-
It's possible to jump from connectors view to corresponding topics and from a topic to consumers (back and forth) for more convenient navigation.
81-
connectors, overview topic settings.
79+
You can jump from the connectors view to corresponding topics and from a topic to consumers (back and forth) for more convenient navigation, including connectors and overview topic settings.
8280

8381
![Connector_Topic_Consumer](documentation/images/Connector_Topic_Consumer.gif)
8482

8583
### Messages
86-
Let's say we want to produce messages for our topic. With the Kafbat UI we can send or write data/messages to the Kafka topics without effort by specifying parameters, and viewing messages in the list.
84+
Suppose you want to produce messages for your topic. With Kafbat UI, you can easily send or write data/messages to Kafka topics by specifying parameters and viewing messages in the list.
8785

8886
![Produce Message](documentation/images/Create_message_kafka-ui.gif)
8987

9088
## Schema registry
91-
There are 3 supported types of schemas: Avro®, JSON Schema, and Protobuf schemas.
89+
There are three supported types of schemas: Avro®, JSON Schema, and Protobuf schemas.
9290

9391
![Create Schema Registry](documentation/images/Create_schema.gif)
9492

95-
Before producing avro/protobuf encoded messages, you have to add a schema for the topic in Schema Registry. Now all these steps are easy to do
96-
with a few clicks in a user-friendly interface.
93+
Before producing Avro/Protobuf encoded messages, you need to add a schema for the topic in the Schema Registry. All these steps are now easy to do with just a few clicks in a user-friendly interface.
9794

9895
![Avro Schema Topic](documentation/images/Schema_Topic.gif)
9996

@@ -111,7 +108,7 @@ docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true ghcr.io/kafbat/kafka-
111108

112109
Then access the web UI at [http://localhost:8080](http://localhost:8080)
113110

114-
The command is sufficient to try things out. When you're done trying things out, you can proceed with a [persistent installation](https://ui.docs.kafbat.io/quick-start/persistent-start)
111+
This command is sufficient to try things out. When you're done, you can proceed with a [persistent installation](https://ui.docs.kafbat.io/quick-start/persistent-start).
115112

116113
## Persistent installation
117114

@@ -146,24 +143,24 @@ Please refer to our [configuration](https://ui.docs.kafbat.io/configuration/conf
146143
147144
## Building from sources
148145
149-
[Quick start](https://ui.docs.kafbat.io/development/building/prerequisites) with building
146+
[Quick start](https://ui.docs.kafbat.io/development/building/prerequisites) for building from source
150147
151148
## Liveliness and readiness probes
152-
Liveliness and readiness endpoint is at `/actuator/health`.<br/>
153-
Info endpoint (build info) is located at `/actuator/info`.
149+
The liveness and readiness endpoint is at `/actuator/health`.<br/>
150+
The info endpoint (build info) is located at `/actuator/info`.
154151

155152
# Configuration options
156153

157-
All the environment variables/config properties could be found [here](https://ui.docs.kafbat.io/configuration/misc-configuration-properties).
154+
All environment variables and configuration properties can be found [here](https://ui.docs.kafbat.io/configuration/misc-configuration-properties).
158155

159156
# Contributing
160157

161-
Please refer to [contributing guide](https://ui.docs.kafbat.io/development/contributing), we'll guide you from there.
158+
Please refer to the [contributing guide](https://ui.docs.kafbat.io/development/contributing); we'll guide you from there.
162159

163160
# Support
164161

165162
As we're fully independent, team members contribute in their free time.
166-
Your support is crucial for us, if you wish to sponsor us, take a look [here](https://github.com/sponsors/kafbat)
163+
Your support is crucial for us, if you wish to sponsor us, take a look [here](https://github.com/sponsors/kafbat)
167164

168165
# Powered by
169166

api/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# The tag is ignored when a sha is included but the reason to add it are:
22
# 1. Self Documentation: It is difficult to find out what the expected tag is given a sha alone
33
# 2. Helps dependabot during discovery of upgrades
4-
FROM azul/zulu-openjdk-alpine:21.0.8-jre-headless@sha256:9c7b4b7850bd4cdd78f91b369accc5b55beffa9a073b9a2bb94caa42606b9444
4+
FROM azul/zulu-openjdk-alpine:21.0.8-jre-headless@sha256:48356cf1b81243b6e236509bb6ae281969fc0eade03f622220973a4e224cd768
55

66
RUN apk add --no-cache \
77
# snappy codec

api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
import io.kafbat.ui.connect.ApiClient;
88
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
99
import io.kafbat.ui.connect.model.Connector;
10+
import io.kafbat.ui.connect.model.ConnectorExpand;
1011
import io.kafbat.ui.connect.model.ConnectorPlugin;
1112
import io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse;
1213
import io.kafbat.ui.connect.model.ConnectorStatus;
1314
import io.kafbat.ui.connect.model.ConnectorTask;
1415
import io.kafbat.ui.connect.model.ConnectorTopics;
16+
import io.kafbat.ui.connect.model.ExpandedConnector;
1517
import io.kafbat.ui.connect.model.NewConnector;
1618
import io.kafbat.ui.connect.model.TaskStatus;
1719
import io.kafbat.ui.exception.KafkaConnectConflictResponseException;
@@ -221,13 +223,17 @@ public Mono<ResponseEntity<Map<String, ConnectorTopics>>> getConnectorTopicsWith
221223
}
222224

223225
@Override
224-
public Mono<List<String>> getConnectors(String search) throws WebClientResponseException {
225-
return withRetryOnConflictOrRebalance(super.getConnectors(search));
226+
public Mono<Map<String, ExpandedConnector>> getConnectors(
227+
String search, List<ConnectorExpand> expand
228+
) throws WebClientResponseException {
229+
return withRetryOnConflictOrRebalance(super.getConnectors(search, expand));
226230
}
227231

228232
@Override
229-
public Mono<ResponseEntity<List<String>>> getConnectorsWithHttpInfo(String search) throws WebClientResponseException {
230-
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search));
233+
public Mono<ResponseEntity<Map<String, ExpandedConnector>>> getConnectorsWithHttpInfo(
234+
String search, List<ConnectorExpand> expand
235+
) throws WebClientResponseException {
236+
return withRetryOnConflictOrRebalance(super.getConnectorsWithHttpInfo(search, expand));
231237
}
232238

233239
@Override

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ public enum LogLevel {
227227
@AllArgsConstructor
228228
public static class CacheProperties {
229229
boolean enabled = true;
230-
Duration connectCacheExpiry = Duration.ofMinutes(1);
231230
Duration connectClusterCacheExpiry = Duration.ofHours(24);
232231
}
233232

@@ -237,6 +236,7 @@ public static class CacheProperties {
237236
public static class NgramProperties {
238237
int ngramMin = 1;
239238
int ngramMax = 4;
239+
boolean distanceScore = true;
240240
}
241241

242242
@Data
@@ -245,10 +245,10 @@ public static class NgramProperties {
245245
public static class ClusterFtsProperties {
246246
boolean enabled = true;
247247
boolean defaultEnabled = false;
248-
NgramProperties schemas = new NgramProperties(1, 4);
249-
NgramProperties consumers = new NgramProperties(1, 4);
250-
NgramProperties connect = new NgramProperties(1, 4);
251-
NgramProperties acl = new NgramProperties(1, 4);
248+
NgramProperties schemas = new NgramProperties(1, 4, true);
249+
NgramProperties consumers = new NgramProperties(1, 4, true);
250+
NgramProperties connect = new NgramProperties(1, 4, true);
251+
NgramProperties acl = new NgramProperties(1, 4, true);
252252

253253
public boolean use(Boolean request) {
254254
if (enabled) {

api/src/main/java/io/kafbat/ui/config/McpConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ public McpAsyncServer mcpServer(WebFluxSseServerTransportProvider transport) {
4444
// Configure server capabilities with resource support
4545
var capabilities = McpSchema.ServerCapabilities.builder()
4646
.resources(false, true)
47-
.tools(true) // Tool support with list changes notifications
47+
.tools(true) // Tools support with list changes notifications
4848
.prompts(false) // Prompt support with list changes notifications
4949
.logging() // Logging support
5050
.build();
5151

52-
// Create the server with both tool and resource capabilities
52+
// Create the server with both tools and resource capabilities
5353
return McpServer.async(transport)
5454
.serverInfo("Kafka UI MCP", "0.0.1")
5555
.capabilities(capabilities)

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.kafbat.ui.service.mcp.McpTool;
2424
import java.util.Comparator;
2525
import java.util.Map;
26+
import java.util.Optional;
2627
import java.util.Set;
2728
import javax.validation.Valid;
2829
import lombok.RequiredArgsConstructor;
@@ -66,8 +67,12 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri
6667
.build();
6768

6869
return validateAccess(context)
69-
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
70-
.doOnEach(sig -> audit(context, sig));
70+
.thenReturn(
71+
ResponseEntity.ok(
72+
kafkaConnectService.getConnectors(getCluster(clusterName), connectName)
73+
.flatMapMany(m -> Flux.fromIterable(m.keySet()))
74+
)
75+
).doOnEach(sig -> audit(context, sig));
7176
}
7277

7378
@Override
@@ -137,15 +142,18 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
137142
.operationName("getAllConnectors")
138143
.build();
139144

145+
var maybeComparator = Optional.ofNullable(orderBy).map(this::getConnectorsComparator);
146+
140147
var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
141-
? getConnectorsComparator(orderBy)
142-
: getConnectorsComparator(orderBy).reversed();
148+
? maybeComparator
149+
: maybeComparator.map(Comparator::reversed);
143150

144-
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
145-
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
146-
.sort(comparator);
151+
Flux<FullConnectorInfoDTO> connectors = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
152+
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName));
147153

148-
return Mono.just(ResponseEntity.ok(job))
154+
Flux<FullConnectorInfoDTO> sorted = comparator.map(connectors::sort).orElse(connectors);
155+
156+
return Mono.just(ResponseEntity.ok(sorted))
149157
.doOnEach(sig -> audit(context, sig));
150158
}
151159

@@ -280,9 +288,7 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
280288
FullConnectorInfoDTO::getName,
281289
Comparator.nullsFirst(Comparator.naturalOrder())
282290
);
283-
if (orderBy == null) {
284-
return defaultComparator;
285-
}
291+
286292
return switch (orderBy) {
287293
case CONNECT -> Comparator.comparing(
288294
FullConnectorInfoDTO::getConnect,

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Comparator;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.Optional;
2829
import javax.validation.Valid;
2930
import lombok.RequiredArgsConstructor;
3031
import lombok.extern.slf4j.Slf4j;
@@ -244,11 +245,15 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
244245

245246
List<String> subjectsToRetrieve;
246247
boolean paginate = true;
247-
var schemaComparator = getComparatorForSchema(orderBy);
248-
final Comparator<SubjectWithCompatibilityLevel> comparator =
249-
sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
250-
? schemaComparator : schemaComparator.reversed();
248+
249+
var schemaComparator = Optional.ofNullable(orderBy).map(this::getComparatorForSchema);
250+
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
251+
? schemaComparator : schemaComparator.map(Comparator::reversed);
252+
251253
if (orderBy == null || SchemaColumnsToSortDTO.SUBJECT.equals(orderBy)) {
254+
if (orderBy != null) {
255+
filteredSubjects.sort(Comparator.nullsFirst(Comparator.naturalOrder()));
256+
}
252257
if (SortOrderDTO.DESC.equals(sortOrder)) {
253258
filteredSubjects.sort(Comparator.nullsFirst(Comparator.reverseOrder()));
254259
}
@@ -274,11 +279,13 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
274279

275280
private List<SubjectWithCompatibilityLevel> paginateSchemas(
276281
List<SubjectWithCompatibilityLevel> subjects,
277-
Comparator<SubjectWithCompatibilityLevel> comparator,
282+
Optional<Comparator<SubjectWithCompatibilityLevel>> comparator,
278283
boolean paginate,
279284
int pageSize,
280285
int subjectToSkip) {
281-
subjects.sort(comparator);
286+
287+
comparator.ifPresent(subjects::sort);
288+
282289
if (paginate) {
283290
return subjects.subList(subjectToSkip, Math.min(subjectToSkip + pageSize, subjects.size()));
284291
} else {

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.kafbat.ui.api.TopicsApi;
1212
import io.kafbat.ui.config.ClustersProperties;
1313
import io.kafbat.ui.mapper.ClusterMapper;
14+
import io.kafbat.ui.model.FullConnectorInfoDTO;
1415
import io.kafbat.ui.model.InternalTopic;
1516
import io.kafbat.ui.model.InternalTopicConfig;
1617
import io.kafbat.ui.model.PartitionsIncreaseDTO;
@@ -28,6 +29,8 @@
2829
import io.kafbat.ui.model.TopicUpdateDTO;
2930
import io.kafbat.ui.model.TopicsResponseDTO;
3031
import io.kafbat.ui.model.rbac.AccessContext;
32+
import io.kafbat.ui.model.rbac.permission.ConnectAction;
33+
import io.kafbat.ui.service.KafkaConnectService;
3134
import io.kafbat.ui.service.TopicsService;
3235
import io.kafbat.ui.service.analyze.TopicAnalysisService;
3336
import io.kafbat.ui.service.mcp.McpTool;
@@ -55,6 +58,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
5558
private final TopicAnalysisService topicAnalysisService;
5659
private final ClusterMapper clusterMapper;
5760
private final ClustersProperties clustersProperties;
61+
private final KafkaConnectService kafkaConnectService;
5862

5963
@Override
6064
public Mono<ResponseEntity<TopicDTO>> createTopic(
@@ -370,4 +374,23 @@ private Comparator<InternalTopic> getComparatorForTopic(
370374
default -> defaultComparator;
371375
};
372376
}
377+
378+
@Override
379+
public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getTopicConnectors(String clusterName,
380+
String topicName,
381+
ServerWebExchange exchange) {
382+
var context = AccessContext.builder()
383+
.cluster(clusterName)
384+
.topicActions(topicName, VIEW)
385+
.operationName("getTopicConnectors")
386+
.operationParams(topicName)
387+
.build();
388+
389+
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getTopicConnectors(getCluster(clusterName), topicName)
390+
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName));
391+
392+
return validateAccess(context)
393+
.then(Mono.just(ResponseEntity.ok(job)))
394+
.doOnEach(sig -> audit(context, sig));
395+
}
373396
}

0 commit comments

Comments
 (0)