Skip to content

Commit f2fabd3

Browse files
authored
BE: Full text search support (#1267)
1 parent 063c3fb commit f2fabd3

33 files changed

+1044
-80
lines changed

api/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ dependencies {
5454
antlr libs.antlr
5555
implementation libs.antlr.runtime
5656

57+
implementation libs.lucene
58+
implementation libs.lucene.queryparser
59+
implementation libs.lucene.analysis.common
60+
5761
implementation libs.opendatadiscovery.oddrn
5862
implementation(libs.opendatadiscovery.client) {
5963
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-webflux'
@@ -74,6 +78,7 @@ dependencies {
7478
// END Fixes https://www.cve.org/CVERecord?id=CVE-2025-58056 and https://www.cve.org/CVERecord?id=CVE-2025-58057
7579
// CVE Fixes End
7680

81+
7782
implementation libs.modelcontextprotocol.spring.webflux
7883
implementation libs.victools.jsonschema.generator
7984

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ClustersProperties {
4141
MetricsStorage defaultMetricsStorage = new MetricsStorage();
4242

4343
CacheProperties cache = new CacheProperties();
44+
ClusterFtsProperties fts = new ClusterFtsProperties();
4445

4546
@Data
4647
public static class Cluster {
@@ -217,6 +218,25 @@ public static class CacheProperties {
217218
Duration connectClusterCacheExpiry = Duration.ofHours(24);
218219
}
219220

221+
@Data
222+
@NoArgsConstructor
223+
@AllArgsConstructor
224+
public static class NgramProperties {
225+
int ngramMin = 1;
226+
int ngramMax = 4;
227+
}
228+
229+
@Data
230+
@NoArgsConstructor
231+
@AllArgsConstructor
232+
public static class ClusterFtsProperties {
233+
boolean enabled = false;
234+
NgramProperties schemas = new NgramProperties(1, 4);
235+
NgramProperties consumers = new NgramProperties(1, 4);
236+
NgramProperties connect = new NgramProperties(1, 4);
237+
NgramProperties acl = new NgramProperties(1, 4);
238+
}
239+
220240
@PostConstruct
221241
public void validateAndSetDefaults() {
222242
if (clusters != null) {

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package io.kafbat.ui.controller;
22

3-
import static org.apache.commons.lang3.Strings.CI;
4-
53
import io.kafbat.ui.api.SchemasApi;
4+
import io.kafbat.ui.config.ClustersProperties;
65
import io.kafbat.ui.exception.ValidationException;
76
import io.kafbat.ui.mapper.KafkaSrMapper;
87
import io.kafbat.ui.mapper.KafkaSrMapperImpl;
@@ -15,13 +14,13 @@
1514
import io.kafbat.ui.model.rbac.AccessContext;
1615
import io.kafbat.ui.model.rbac.permission.SchemaAction;
1716
import io.kafbat.ui.service.SchemaRegistryService;
17+
import io.kafbat.ui.service.index.SchemasFilter;
1818
import io.kafbat.ui.service.mcp.McpTool;
1919
import java.util.List;
2020
import java.util.Map;
2121
import javax.validation.Valid;
2222
import lombok.RequiredArgsConstructor;
2323
import lombok.extern.slf4j.Slf4j;
24-
import org.apache.commons.lang3.StringUtils;
2524
import org.springframework.http.ResponseEntity;
2625
import org.springframework.web.bind.annotation.RestController;
2726
import org.springframework.web.server.ServerWebExchange;
@@ -38,6 +37,7 @@ public class SchemasController extends AbstractController implements SchemasApi,
3837
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
3938

4039
private final SchemaRegistryService schemaRegistryService;
40+
private final ClustersProperties clustersProperties;
4141

4242
@Override
4343
protected KafkaCluster getCluster(String clusterName) {
@@ -214,6 +214,8 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
214214
.operationName("getSchemas")
215215
.build();
216216

217+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
218+
217219
return schemaRegistryService
218220
.getAllSubjectNames(getCluster(clusterName))
219221
.flatMapIterable(l -> l)
@@ -222,10 +224,10 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
222224
.flatMap(subjects -> {
223225
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
224226
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
225-
List<String> filteredSubjects = subjects
226-
.stream()
227-
.filter(subj -> search == null || CI.contains(subj, search))
228-
.sorted().toList();
227+
228+
SchemasFilter filter = new SchemasFilter(subjects, fts.isEnabled(), fts.getSchemas());
229+
List<String> filteredSubjects = filter.find(search);
230+
229231
var totalPages = (filteredSubjects.size() / pageSize)
230232
+ (filteredSubjects.size() % pageSize == 0 ? 0 : 1);
231233
List<String> subjectsToRender = filteredSubjects.stream()

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static org.apache.commons.lang3.Strings.CI;
1111

1212
import io.kafbat.ui.api.TopicsApi;
13+
import io.kafbat.ui.config.ClustersProperties;
1314
import io.kafbat.ui.mapper.ClusterMapper;
1415
import io.kafbat.ui.model.InternalTopic;
1516
import io.kafbat.ui.model.InternalTopicConfig;
@@ -37,7 +38,6 @@
3738
import javax.validation.Valid;
3839
import lombok.RequiredArgsConstructor;
3940
import lombok.extern.slf4j.Slf4j;
40-
import org.apache.commons.lang3.StringUtils;
4141
import org.springframework.http.HttpStatus;
4242
import org.springframework.http.ResponseEntity;
4343
import org.springframework.web.bind.annotation.RestController;
@@ -55,6 +55,7 @@ public class TopicsController extends AbstractController implements TopicsApi, M
5555
private final TopicsService topicsService;
5656
private final TopicAnalysisService topicAnalysisService;
5757
private final ClusterMapper clusterMapper;
58+
private final ClustersProperties clustersProperties;
5859

5960
@Override
6061
public Mono<ResponseEntity<TopicDTO>> createTopic(
@@ -181,23 +182,23 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
181182
.operationName("getTopics")
182183
.build();
183184

184-
return topicsService.getTopicsForPagination(getCluster(clusterName))
185+
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal)
185186
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
186187
.flatMap(topics -> {
187188
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
188189
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
190+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
191+
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled());
189192
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
190-
? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
191-
List<InternalTopic> filtered = topics.stream()
192-
.filter(topic -> !topic.isInternal()
193-
|| showInternal != null && showInternal)
194-
.filter(topic -> search == null || CI.contains(topic.getName(), search))
195-
.sorted(comparator)
196-
.toList();
193+
? comparatorForTopic : comparatorForTopic.reversed();
194+
195+
List<InternalTopic> filtered = topics.stream().sorted(comparator).toList();
196+
197197
var totalPages = (filtered.size() / pageSize)
198198
+ (filtered.size() % pageSize == 0 ? 0 : 1);
199199

200200
List<String> topicsPage = filtered.stream()
201+
.filter(t -> !t.isInternal() || showInternal != null && showInternal)
201202
.skip(topicsToSkip)
202203
.limit(pageSize)
203204
.map(InternalTopic::getName)
@@ -348,9 +349,12 @@ public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates
348349
}
349350

350351
private Comparator<InternalTopic> getComparatorForTopic(
351-
TopicColumnsToSortDTO orderBy) {
352+
TopicColumnsToSortDTO orderBy,
353+
boolean ftsEnabled) {
352354
var defaultComparator = Comparator.comparing(InternalTopic::getName);
353-
if (orderBy == null) {
355+
if (orderBy == null && ftsEnabled) {
356+
return (o1, o2) -> 0;
357+
} else if (orderBy == null) {
354358
return defaultComparator;
355359
}
356360
return switch (orderBy) {

api/src/main/java/io/kafbat/ui/model/InternalTopic.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,16 @@ public class InternalTopic {
3838
private final long segmentSize;
3939
private final long segmentCount;
4040

41+
42+
public InternalTopic withMetrics(Metrics metrics) {
43+
var builder = toBuilder();
44+
if (metrics != null) {
45+
builder.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(this.name));
46+
builder.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(this.name));
47+
}
48+
return builder.build();
49+
}
50+
4151
public static InternalTopic from(TopicDescription topicDescription,
4252
List<ConfigEntry> configs,
4353
InternalPartitionsOffsets partitionsOffsets,
@@ -113,8 +123,10 @@ public static InternalTopic from(TopicDescription topicDescription,
113123
topic.segmentSize(stats.getSegmentSize());
114124
});
115125

116-
topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
117-
topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
126+
if (metrics != null) {
127+
topic.bytesInPerSec(metrics.getIoRates().topicBytesInPerSec().get(topicDescription.name()));
128+
topic.bytesOutPerSec(metrics.getIoRates().topicBytesOutPerSec().get(topicDescription.name()));
129+
}
118130

119131
topic.topicConfigs(
120132
configs.stream().map(InternalTopicConfig::from).collect(Collectors.toList()));

api/src/main/java/io/kafbat/ui/model/Statistics.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
@Value
1313
@Builder(toBuilder = true)
14-
public class Statistics {
14+
public class Statistics implements AutoCloseable {
1515
ServerStatusDTO status;
1616
Throwable lastKafkaException;
1717
String version;
@@ -46,4 +46,11 @@ public Stream<TopicDescription> topicDescriptions() {
4646
public Statistics withClusterState(UnaryOperator<ScrapedClusterState> stateUpdate) {
4747
return toBuilder().clusterState(stateUpdate.apply(clusterState)).build();
4848
}
49+
50+
@Override
51+
public void close() throws Exception {
52+
if (clusterState != null) {
53+
clusterState.close();
54+
}
55+
}
4956
}

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package io.kafbat.ui.service;
22

3-
import static org.apache.commons.lang3.Strings.CI;
4-
53
import com.google.common.collect.Streams;
64
import com.google.common.collect.Table;
5+
import io.kafbat.ui.config.ClustersProperties;
76
import io.kafbat.ui.emitter.EnhancedConsumer;
87
import io.kafbat.ui.model.ConsumerGroupOrderingDTO;
98
import io.kafbat.ui.model.InternalConsumerGroup;
109
import io.kafbat.ui.model.InternalTopicConsumerGroup;
1110
import io.kafbat.ui.model.KafkaCluster;
1211
import io.kafbat.ui.model.SortOrderDTO;
12+
import io.kafbat.ui.service.index.ConsumerGroupFilter;
1313
import io.kafbat.ui.service.rbac.AccessControlService;
1414
import io.kafbat.ui.util.ApplicationMetrics;
1515
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
@@ -25,7 +25,6 @@
2525
import java.util.stream.Stream;
2626
import javax.annotation.Nullable;
2727
import lombok.RequiredArgsConstructor;
28-
import org.apache.commons.lang3.StringUtils;
2928
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
3029
import org.apache.kafka.clients.admin.ConsumerGroupListing;
3130
import org.apache.kafka.clients.admin.OffsetSpec;
@@ -41,6 +40,7 @@ public class ConsumerGroupService {
4140

4241
private final AdminClientService adminClientService;
4342
private final AccessControlService accessControlService;
43+
private final ClustersProperties clustersProperties;
4444

4545
private Mono<List<InternalConsumerGroup>> getConsumerGroups(
4646
ReactiveAdminClient ac,
@@ -114,11 +114,7 @@ public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
114114
SortOrderDTO sortOrderDto) {
115115
return adminClientService.get(cluster).flatMap(ac ->
116116
ac.listConsumerGroups()
117-
.map(listing -> search == null
118-
? listing
119-
: listing.stream()
120-
.filter(g -> CI.contains(g.groupId(), search))
121-
.toList()
117+
.map(listing -> filterGroups(listing, search)
122118
)
123119
.flatMapIterable(lst -> lst)
124120
.filterWhen(cg -> accessControlService.isConsumerGroupAccessible(cg.groupId(), cluster.getName()))
@@ -131,6 +127,12 @@ public Mono<ConsumerGroupsPage> getConsumerGroupsPage(
131127
(allGroups.size() / perPage) + (allGroups.size() % perPage == 0 ? 0 : 1))))));
132128
}
133129

130+
private Collection<ConsumerGroupListing> filterGroups(Collection<ConsumerGroupListing> groups, String search) {
131+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
132+
ConsumerGroupFilter filter = new ConsumerGroupFilter(groups, fts.isEnabled(), fts.getConsumers());
133+
return filter.find(search, false);
134+
}
135+
134136
private Mono<List<ConsumerGroupDescription>> loadSortedDescriptions(ReactiveAdminClient ac,
135137
List<ConsumerGroupListing> groups,
136138
int pageNum,

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.kafbat.ui.model.NewConnectorDTO;
2828
import io.kafbat.ui.model.TaskDTO;
2929
import io.kafbat.ui.model.connect.InternalConnectorInfo;
30+
import io.kafbat.ui.service.index.KafkaConnectNgramFilter;
3031
import io.kafbat.ui.util.ReactiveFailover;
3132
import jakarta.validation.Valid;
3233
import java.util.List;
@@ -151,15 +152,16 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
151152
.topics(tuple.getT4().getTopics())
152153
.build())))
153154
.map(kafkaConnectMapper::fullConnectorInfo)
154-
.filter(matchesSearchTerm(search));
155+
.collectList()
156+
.map(lst -> filterConnectors(lst, search))
157+
.flatMapMany(Flux::fromIterable);
155158
}
156159

157-
private Predicate<FullConnectorInfoDTO> matchesSearchTerm(@Nullable final String search) {
158-
if (search == null) {
159-
return c -> true;
160-
}
161-
return connector -> getStringsForSearch(connector)
162-
.anyMatch(string -> CI.contains(string, search));
160+
private List<FullConnectorInfoDTO> filterConnectors(List<FullConnectorInfoDTO> connectors, String search) {
161+
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
162+
KafkaConnectNgramFilter filter =
163+
new KafkaConnectNgramFilter(connectors, fts.isEnabled(), fts.getConnect());
164+
return filter.find(search);
163165
}
164166

165167
private Stream<String> getStringsForSearch(FullConnectorInfoDTO fullConnectorInfo) {

api/src/main/java/io/kafbat/ui/service/StatisticsCache.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
package io.kafbat.ui.service;
22

3+
import io.kafbat.ui.config.ClustersProperties;
34
import io.kafbat.ui.model.InternalPartitionsOffsets;
45
import io.kafbat.ui.model.KafkaCluster;
6+
import io.kafbat.ui.model.ServerStatusDTO;
57
import io.kafbat.ui.model.Statistics;
68
import java.util.List;
79
import java.util.Map;
810
import java.util.Objects;
911
import java.util.concurrent.ConcurrentHashMap;
12+
import lombok.extern.slf4j.Slf4j;
1013
import org.apache.kafka.clients.admin.ConfigEntry;
1114
import org.apache.kafka.clients.admin.TopicDescription;
1215
import org.springframework.stereotype.Component;
1316

17+
@Slf4j
1418
@Component
1519
public class StatisticsCache {
1620

@@ -28,12 +32,22 @@ public synchronized void replace(KafkaCluster c, Statistics stats) {
2832
public synchronized void update(KafkaCluster c,
2933
Map<String, TopicDescription> descriptions,
3034
Map<String, List<ConfigEntry>> configs,
31-
InternalPartitionsOffsets partitionsOffsets) {
35+
InternalPartitionsOffsets partitionsOffsets,
36+
ClustersProperties clustersProperties) {
3237
var stats = get(c);
3338
replace(
3439
c,
35-
stats.withClusterState(s -> s.updateTopics(descriptions, configs, partitionsOffsets))
40+
stats.withClusterState(s ->
41+
s.updateTopics(descriptions, configs, partitionsOffsets, clustersProperties)
42+
)
3643
);
44+
try {
45+
if (!stats.getStatus().equals(ServerStatusDTO.INITIALIZING)) {
46+
stats.close();
47+
}
48+
} catch (Exception e) {
49+
log.error("Error closing cluster {} stats", c.getName(), e);
50+
}
3751
}
3852

3953
public synchronized void onTopicDelete(KafkaCluster c, String topic) {

api/src/main/java/io/kafbat/ui/service/StatisticsService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static io.kafbat.ui.service.ReactiveAdminClient.ClusterDescription;
44

5+
import io.kafbat.ui.config.ClustersProperties;
56
import io.kafbat.ui.model.ClusterFeature;
67
import io.kafbat.ui.model.KafkaCluster;
78
import io.kafbat.ui.model.Metrics;
@@ -22,6 +23,7 @@ public class StatisticsService {
2223
private final AdminClientService adminClientService;
2324
private final FeatureService featureService;
2425
private final StatisticsCache cache;
26+
private final ClustersProperties clustersProperties;
2527

2628
public Mono<Statistics> updateCache(KafkaCluster c) {
2729
return getStatistics(c).doOnSuccess(m -> cache.replace(c, m));
@@ -62,7 +64,7 @@ private Statistics createStats(ClusterDescription description,
6264

6365
private Mono<ScrapedClusterState> loadClusterState(ClusterDescription clusterDescription,
6466
ReactiveAdminClient ac) {
65-
return ScrapedClusterState.scrape(clusterDescription, ac);
67+
return ScrapedClusterState.scrape(clusterDescription, ac, clustersProperties);
6668
}
6769

6870
private Mono<Metrics> scrapeMetrics(KafkaCluster cluster,

0 commit comments

Comments
 (0)