diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 0efe5e827..b043697c6 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -16,6 +16,8 @@ import io.kafbat.ui.util.MetadataVersion; import io.kafbat.ui.util.annotation.KafkaClientInternalsDependant; import java.io.Closeable; +import java.io.PrintWriter; +import java.io.StringWriter; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -88,7 +90,6 @@ import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.quota.ClientQuotaFilter; -import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.resource.ResourcePatternFilter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -192,7 +193,7 @@ private static Mono extract(AdminClient ac) { public static Mono create(AdminClient adminClient) { Mono configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient); - return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info)); + return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info, null)); } @@ -230,6 +231,23 @@ public static Mono toMono(KafkaFuture future) { .publishOn(Schedulers.parallel()); } + private record ClosedContext(StackTraceElement[] stackTrace) { + @Override + public String toString() { + // Convert stack trace to a string for logging + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + for (StackTraceElement element : stackTrace) { + pw.println("\tat " + element); + } + return sw.toString(); + } + + static ClosedContext capture() { + return new ClosedContext(Thread.currentThread().getStackTrace()); + } + } + //--------------------------------------------------------------------------------- @Getter(AccessLevel.PACKAGE) // visible for testing @@ -237,16 +255,19 @@ public static Mono toMono(KafkaFuture future) { private final Mono configRelatedInfoMono; private volatile ConfigRelatedInfo configRelatedInfo; + private volatile ClosedContext closedContext; public Set getClusterFeatures() { return configRelatedInfo.features(); } public Mono> listTopics(boolean listInternal) { + checkClosed(); return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names()); } public Mono deleteTopic(String topicName) { + checkClosed(); return toMono(client.deleteTopics(List.of(topicName)).all()); } @@ -287,6 +308,8 @@ public Mono>> getTopicsConfig(Collection t } private Mono>> getTopicsConfigImpl(Collection topicNames, boolean includeDoc) { + checkClosed(); + List resources = topicNames.stream() .map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName)) .collect(toList()); @@ -337,6 +360,8 @@ private static Mono>> loadBrokersConfig(AdminClie * Return per-broker configs or empty map if broker's configs retrieval not supported. */ public Mono>> loadBrokersConfig(List brokerIds) { + checkClosed(); + return loadBrokersConfig(client, brokerIds); } @@ -355,6 +380,8 @@ public Mono> describeTopics(Collection top } private Mono> describeTopicsImpl(Collection topics) { + checkClosed(); + return toMonoWithExceptionFilter( client.describeTopics(topics).topicNameValues(), UnknownTopicOrPartitionException.class, @@ -411,6 +438,8 @@ static Mono> toMonoWithExceptionFilter(Map> v public Mono>> describeLogDirs( Collection brokerIds) { + checkClosed(); + return toMono(client.describeLogDirs(brokerIds).allDescriptions()) .onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of())) .onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of())) @@ -421,6 +450,8 @@ public Mono>> describeLogDirs( } public Mono describeCluster() { + checkClosed(); + return describeClusterImpl(client, getClusterFeatures()); } @@ -444,6 +475,8 @@ private static Mono describeClusterImpl(AdminClient client, } public Mono deleteConsumerGroups(Collection groupIds) { + checkClosed(); + return toMono(client.deleteConsumerGroups(groupIds).all()) .onErrorResume(GroupIdNotFoundException.class, th -> Mono.error(new NotFoundException("The group id does not exist"))) @@ -452,6 +485,8 @@ public Mono deleteConsumerGroups(Collection groupIds) { } public Mono deleteConsumerGroupOffsets(String groupId, String topicName) { + checkClosed(); + return listConsumerGroupOffsets(List.of(groupId), null) .flatMap(table -> { // filter TopicPartitions by topicName @@ -476,6 +511,8 @@ public Mono createTopic(String name, int numPartitions, @Nullable Integer replicationFactor, Map configs) { + checkClosed(); + var newTopic = new NewTopic( name, Optional.of(numPartitions), @@ -486,10 +523,14 @@ public Mono createTopic(String name, public Mono alterPartitionReassignments( Map> reassignments) { + checkClosed(); + return toMono(client.alterPartitionReassignments(reassignments).all()); } public Mono createPartitions(Map newPartitionsMap) { + checkClosed(); + return toMono(client.createPartitions(newPartitionsMap).all()); } @@ -511,10 +552,14 @@ public Mono> listConsumerGroupNames() { } public Mono> listConsumerGroups() { + checkClosed(); + return toMono(client.listConsumerGroups().all()); } public Mono> describeConsumerGroups(Collection groupIds) { + checkClosed(); + return partitionCalls( groupIds, 25, @@ -529,6 +574,8 @@ public Mono> describeConsumerGroups(Collec public Mono> listConsumerGroupOffsets(List consumerGroups, // all partitions if null passed @Nullable List partitions) { + checkClosed(); + Function, Mono>>> call = groups -> toMono( client.listConsumerGroupOffsets( @@ -560,6 +607,8 @@ public Mono> listConsumerGroupOffsets(List alterConsumerGroupOffsets(String groupId, Map offsets) { + checkClosed(); + return toMono(client.alterConsumerGroupOffsets( groupId, offsets.entrySet().stream() @@ -646,6 +695,8 @@ static Set filterPartitionsWithLeaderCheck(Collection> listOffsetsUnsafe(Collection partitions, OffsetSpec offsetSpec) { + checkClosed(); + if (partitions.isEmpty()) { return Mono.just(Map.of()); } @@ -672,28 +723,35 @@ Mono> listOffsetsUnsafe(Collection par } public Mono> listAcls(ResourcePatternFilter filter) { + checkClosed(); + Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values()); } public Mono createAcls(Collection aclBindings) { + checkClosed(); + Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); return toMono(client.createAcls(aclBindings).all()); } public Mono deleteAcls(Collection aclBindings) { + checkClosed(); Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED)); var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet()); return toMono(client.deleteAcls(filters).all()).then(); } public Mono updateBrokerConfigByName(Integer brokerId, String name, String value) { + checkClosed(); ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)); AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET); return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all()); } public Mono deleteRecords(Map offsets) { + checkClosed(); var records = offsets.entrySet().stream() .map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue()))) .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); @@ -701,20 +759,24 @@ public Mono deleteRecords(Map offsets) { } public Mono alterReplicaLogDirs(Map replicaAssignment) { + checkClosed(); return toMono(client.alterReplicaLogDirs(replicaAssignment).all()); } public Mono>> getClientQuotas(ClientQuotaFilter filter) { + checkClosed(); return toMono(client.describeClientQuotas(filter).entities()); } public Mono alterClientQuota(ClientQuotaAlteration alteration) { + checkClosed(); return toMono(client.alterClientQuotas(List.of(alteration)).all()); } // returns tp -> list of active producer's states (if any) public Mono>> getActiveProducersState(String topic) { + checkClosed(); return describeTopic(topic) .map(td -> client.describeProducers( IntStream.range(0, td.partitions().size()) @@ -731,6 +793,7 @@ public Mono>> getActiveProducersState(St private Mono incrementalAlterConfig(String topicName, List currentConfigs, Map newConfigs) { + checkClosed(); var configsToDelete = currentConfigs.stream() .filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) //manually set configs only .filter(e -> !newConfigs.containsKey(e.name())) @@ -748,6 +811,7 @@ private Mono incrementalAlterConfig(String topicName, @SuppressWarnings("deprecation") private Mono alterConfig(String topicName, Map configs) { + checkClosed(); List configEntries = configs.entrySet().stream() .flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue()))) .collect(toList()); @@ -800,8 +864,15 @@ private static BiFunction, Map, Map> mapMerger() { }; } + private void checkClosed() { + if (this.closedContext != null) { + log.error("AdminClient is already closed at:\n{}", closedContext); + } + } + @Override public void close() { + this.closedContext = ClosedContext.capture(); client.close(); } }