Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ public class ClustersProperties {
CacheProperties cache = new CacheProperties();
ClusterFtsProperties fts = new ClusterFtsProperties();

AdminClient adminClient = new AdminClient();

@Data
public static class AdminClient {
Integer timeout;
int describeConsumerGroupsPartitionSize = 50;
int describeConsumerGroupsConcurrency = 4;
int listConsumerGroupOffsetsPartitionSize = 50;
int listConsumerGroupOffsetsConcurrency = 4;
int getTopicsConfigPartitionSize = 200;
int describeTopicsPartitionSize = 200;
}

@Data
public static class Cluster {
@NotBlank(message = "field name for for cluster could not be blank")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ public class AdminClientServiceImpl implements AdminClientService, Closeable {

private final Map<String, ReactiveAdminClient> adminClientCache = new ConcurrentHashMap<>();
private final int clientTimeout;
private final ClustersProperties clustersProperties;

public AdminClientServiceImpl(ClustersProperties clustersProperties) {
this.clustersProperties = clustersProperties;
this.clientTimeout = Optional.ofNullable(clustersProperties.getAdminClientTimeout())
.orElse(DEFAULT_CLIENT_TIMEOUT_MS);
}
Expand All @@ -53,7 +55,9 @@ private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
);
return AdminClient.create(properties);
}).subscribeOn(Schedulers.boundedElastic())
.flatMap(ac -> ReactiveAdminClient.create(ac).doOnError(th -> ac.close()))
.flatMap(ac -> ReactiveAdminClient.create(ac, clustersProperties.getAdminClient())
.doOnError(th -> ac.close())
)
.onErrorMap(th -> new IllegalStateException(
"Error while creating AdminClient for the cluster " + cluster.getName(), th));
}
Expand Down
61 changes: 41 additions & 20 deletions api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -67,27 +69,46 @@ private Mono<List<InternalConsumerGroup>> getConsumerGroups(
public Mono<List<InternalTopicConsumerGroup>> getConsumerGroupsForTopic(KafkaCluster cluster,
String topic) {
return adminClientService.get(cluster)
// 1. getting topic's end offsets
.flatMap(ac -> ac.listTopicOffsets(topic, OffsetSpec.latest(), false)
.flatMap(endOffsets -> {
var tps = new ArrayList<>(endOffsets.keySet());
// 2. getting all consumer groups
return describeConsumerGroups(ac)
.flatMap((List<ConsumerGroupDescription> groups) -> {
// 3. trying to find committed offsets for topic
var groupNames = groups.stream().map(ConsumerGroupDescription::groupId).toList();
return ac.listConsumerGroupOffsets(groupNames, tps).map(offsets ->
groups.stream()
// 4. keeping only groups that relates to topic
.filter(g -> isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId())))
.map(g ->
// 5. constructing results
InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets))
.toList()
);
}
);
}));
.flatMap(endOffsets ->
describeConsumerGroups(ac).flatMap(groups ->
filterConsumerGroups(ac, groups, topic, endOffsets)
)
)
);
}

private Mono<List<InternalTopicConsumerGroup>> filterConsumerGroups(
ReactiveAdminClient ac,
List<ConsumerGroupDescription> groups,
String topic,
Map<TopicPartition, Long> endOffsets) {
List<TopicPartition> partitions = new ArrayList<>(endOffsets.keySet());

Set<ConsumerGroupState> inactiveStates = Set.of(
ConsumerGroupState.DEAD,
ConsumerGroupState.EMPTY
);

Map<Boolean, List<ConsumerGroupDescription>> partitioned = groups.stream().collect(
Collectors.partitioningBy((g) -> !inactiveStates.contains(g.state()))
);

List<ConsumerGroupDescription> stable = partitioned.get(true).stream()
.filter(g -> isConsumerGroupRelatesToTopic(topic, g, false))
.toList();

List<ConsumerGroupDescription> filtered = new ArrayList<>();
filtered.addAll(stable);
filtered.addAll(partitioned.get(false));

List<String> groupIds = filtered.stream().map(ConsumerGroupDescription::groupId).toList();
return ac.listConsumerGroupOffsets(groupIds, partitions).map(offsets ->
filtered.stream().filter(g ->
isConsumerGroupRelatesToTopic(topic, g, offsets.containsRow(g.groupId()))
).map(g ->
InternalTopicConsumerGroup.create(topic, g, offsets.row(g.groupId()), endOffsets)
).toList());
}

private boolean isConsumerGroupRelatesToTopic(String topic,
Expand Down
21 changes: 12 additions & 9 deletions api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.ImmutableTable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Table;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.exception.IllegalEntityStateException;
import io.kafbat.ui.exception.NotFoundException;
import io.kafbat.ui.exception.ValidationException;
Expand Down Expand Up @@ -88,7 +89,6 @@
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -190,9 +190,11 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
}
}

public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
public static Mono<ReactiveAdminClient> create(AdminClient adminClient, ClustersProperties.AdminClient properties) {
Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
return configRelatedInfoMono.map(info ->
new ReactiveAdminClient(adminClient, configRelatedInfoMono, properties, info)
);
}


Expand Down Expand Up @@ -235,6 +237,7 @@ public static <T> Mono<T> toMono(KafkaFuture<T> future) {
@Getter(AccessLevel.PACKAGE) // visible for testing
private final AdminClient client;
private final Mono<ConfigRelatedInfo> configRelatedInfoMono;
private final ClustersProperties.AdminClient properties;

private volatile ConfigRelatedInfo configRelatedInfo;

Expand Down Expand Up @@ -280,7 +283,7 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> t
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
return partitionCalls(
topicNames,
200,
properties.getGetTopicsConfigPartitionSize(),
part -> getTopicsConfigImpl(part, includeDocFixed),
mapMerger()
);
Expand Down Expand Up @@ -348,7 +351,7 @@ public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> top
// we need to partition calls, because it can lead to AdminClient timeouts in case of large topics count
return partitionCalls(
topics,
200,
properties.getDescribeTopicsPartitionSize(),
this::describeTopicsImpl,
mapMerger()
);
Expand Down Expand Up @@ -517,8 +520,8 @@ public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
return partitionCalls(
groupIds,
25,
4,
properties.getDescribeConsumerGroupsPartitionSize(),
properties.getDescribeConsumerGroupsConcurrency(),
ids -> toMono(client.describeConsumerGroups(ids).all()),
mapMerger()
);
Expand All @@ -541,8 +544,8 @@ public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<S

Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>> merged = partitionCalls(
consumerGroups,
25,
4,
properties.getListConsumerGroupOffsetsPartitionSize(),
properties.getListConsumerGroupOffsetsConcurrency(),
call,
mapMerger()
);
Expand Down
Loading