Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.kafbat.ui.util.MetadataVersion;
import io.kafbat.ui.util.annotation.KafkaClientInternalsDependant;
import java.io.Closeable;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
Expand Down Expand Up @@ -88,7 +90,6 @@
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -192,7 +193,7 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {

public static Mono<ReactiveAdminClient> create(AdminClient adminClient) {
Mono<ConfigRelatedInfo> configRelatedInfoMono = ConfigRelatedInfo.extract(adminClient);
return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info));
return configRelatedInfoMono.map(info -> new ReactiveAdminClient(adminClient, configRelatedInfoMono, info, null));
}


Expand Down Expand Up @@ -230,23 +231,43 @@ public static <T> Mono<T> toMono(KafkaFuture<T> future) {
.publishOn(Schedulers.parallel());
}

private record ClosedContext(StackTraceElement[] stackTrace) {
@Override
public String toString() {
// Convert stack trace to a string for logging
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
for (StackTraceElement element : stackTrace) {
pw.println("\tat " + element);
}
return sw.toString();
}

static ClosedContext capture() {
return new ClosedContext(Thread.currentThread().getStackTrace());
}
}

//---------------------------------------------------------------------------------

@Getter(AccessLevel.PACKAGE) // visible for testing
private final AdminClient client;
private final Mono<ConfigRelatedInfo> configRelatedInfoMono;

private volatile ConfigRelatedInfo configRelatedInfo;
private volatile ClosedContext closedContext;

public Set<SupportedFeature> getClusterFeatures() {
return configRelatedInfo.features();
}

public Mono<Set<String>> listTopics(boolean listInternal) {
checkClosed();
return toMono(client.listTopics(new ListTopicsOptions().listInternal(listInternal)).names());
}

public Mono<Void> deleteTopic(String topicName) {
checkClosed();
return toMono(client.deleteTopics(List.of(topicName)).all());
}

Expand Down Expand Up @@ -287,6 +308,8 @@ public Mono<Map<String, List<ConfigEntry>>> getTopicsConfig(Collection<String> t
}

private Mono<Map<String, List<ConfigEntry>>> getTopicsConfigImpl(Collection<String> topicNames, boolean includeDoc) {
checkClosed();

List<ConfigResource> resources = topicNames.stream()
.map(topicName -> new ConfigResource(ConfigResource.Type.TOPIC, topicName))
.collect(toList());
Expand Down Expand Up @@ -337,6 +360,8 @@ private static Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(AdminClie
* Return per-broker configs or empty map if broker's configs retrieval not supported.
*/
public Mono<Map<Integer, List<ConfigEntry>>> loadBrokersConfig(List<Integer> brokerIds) {
checkClosed();

return loadBrokersConfig(client, brokerIds);
}

Expand All @@ -355,6 +380,8 @@ public Mono<Map<String, TopicDescription>> describeTopics(Collection<String> top
}

private Mono<Map<String, TopicDescription>> describeTopicsImpl(Collection<String> topics) {
checkClosed();

return toMonoWithExceptionFilter(
client.describeTopics(topics).topicNameValues(),
UnknownTopicOrPartitionException.class,
Expand Down Expand Up @@ -411,6 +438,8 @@ static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> v

public Mono<Map<Integer, Map<String, LogDirDescription>>> describeLogDirs(
Collection<Integer> brokerIds) {
checkClosed();

return toMono(client.describeLogDirs(brokerIds).allDescriptions())
.onErrorResume(UnsupportedVersionException.class, th -> Mono.just(Map.of()))
.onErrorResume(ClusterAuthorizationException.class, th -> Mono.just(Map.of()))
Expand All @@ -421,6 +450,8 @@ public Mono<Map<Integer, Map<String, LogDirDescription>>> describeLogDirs(
}

public Mono<ClusterDescription> describeCluster() {
checkClosed();

return describeClusterImpl(client, getClusterFeatures());
}

Expand All @@ -444,6 +475,8 @@ private static Mono<ClusterDescription> describeClusterImpl(AdminClient client,
}

public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
checkClosed();

return toMono(client.deleteConsumerGroups(groupIds).all())
.onErrorResume(GroupIdNotFoundException.class,
th -> Mono.error(new NotFoundException("The group id does not exist")))
Expand All @@ -452,6 +485,8 @@ public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
}

public Mono<Void> deleteConsumerGroupOffsets(String groupId, String topicName) {
checkClosed();

return listConsumerGroupOffsets(List.of(groupId), null)
.flatMap(table -> {
// filter TopicPartitions by topicName
Expand All @@ -476,6 +511,8 @@ public Mono<Void> createTopic(String name,
int numPartitions,
@Nullable Integer replicationFactor,
Map<String, String> configs) {
checkClosed();

var newTopic = new NewTopic(
name,
Optional.of(numPartitions),
Expand All @@ -486,10 +523,14 @@ public Mono<Void> createTopic(String name,

public Mono<Void> alterPartitionReassignments(
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments) {
checkClosed();

return toMono(client.alterPartitionReassignments(reassignments).all());
}

public Mono<Void> createPartitions(Map<String, NewPartitions> newPartitionsMap) {
checkClosed();

return toMono(client.createPartitions(newPartitionsMap).all());
}

Expand All @@ -511,10 +552,14 @@ public Mono<List<String>> listConsumerGroupNames() {
}

public Mono<Collection<ConsumerGroupListing>> listConsumerGroups() {
checkClosed();

return toMono(client.listConsumerGroups().all());
}

public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collection<String> groupIds) {
checkClosed();

return partitionCalls(
groupIds,
25,
Expand All @@ -529,6 +574,8 @@ public Mono<Map<String, ConsumerGroupDescription>> describeConsumerGroups(Collec
public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<String> consumerGroups,
// all partitions if null passed
@Nullable List<TopicPartition> partitions) {
checkClosed();

Function<Collection<String>, Mono<Map<String, Map<TopicPartition, OffsetAndMetadata>>>> call =
groups -> toMono(
client.listConsumerGroupOffsets(
Expand Down Expand Up @@ -560,6 +607,8 @@ public Mono<Table<String, TopicPartition, Long>> listConsumerGroupOffsets(List<S
}

public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition, Long> offsets) {
checkClosed();

return toMono(client.alterConsumerGroupOffsets(
groupId,
offsets.entrySet().stream()
Expand Down Expand Up @@ -646,6 +695,8 @@ static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescr
@KafkaClientInternalsDependant
@VisibleForTesting
Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
checkClosed();

if (partitions.isEmpty()) {
return Mono.just(Map.of());
}
Expand All @@ -672,49 +723,60 @@ Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> par
}

public Mono<Collection<AclBinding>> listAcls(ResourcePatternFilter filter) {
checkClosed();

Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
return toMono(client.describeAcls(new AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values());
}

public Mono<Void> createAcls(Collection<AclBinding> aclBindings) {
checkClosed();

Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
return toMono(client.createAcls(aclBindings).all());
}

public Mono<Void> deleteAcls(Collection<AclBinding> aclBindings) {
checkClosed();
Preconditions.checkArgument(getClusterFeatures().contains(SupportedFeature.AUTHORIZED_SECURITY_ENABLED));
var filters = aclBindings.stream().map(AclBinding::toFilter).collect(Collectors.toSet());
return toMono(client.deleteAcls(filters).all()).then();
}

public Mono<Void> updateBrokerConfigByName(Integer brokerId, String name, String value) {
checkClosed();
ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId));
AlterConfigOp op = new AlterConfigOp(new ConfigEntry(name, value), AlterConfigOp.OpType.SET);
return toMono(client.incrementalAlterConfigs(Map.of(cr, List.of(op))).all());
}

public Mono<Void> deleteRecords(Map<TopicPartition, Long> offsets) {
checkClosed();
var records = offsets.entrySet().stream()
.map(entry -> Map.entry(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue())))
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
return toMono(client.deleteRecords(records).all());
}

public Mono<Void> alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
checkClosed();
return toMono(client.alterReplicaLogDirs(replicaAssignment).all());
}

public Mono<Map<ClientQuotaEntity, Map<String, Double>>> getClientQuotas(ClientQuotaFilter filter) {
checkClosed();
return toMono(client.describeClientQuotas(filter).entities());
}

public Mono<Void> alterClientQuota(ClientQuotaAlteration alteration) {
checkClosed();
return toMono(client.alterClientQuotas(List.of(alteration)).all());
}


// returns tp -> list of active producer's states (if any)
public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(String topic) {
checkClosed();
return describeTopic(topic)
.map(td -> client.describeProducers(
IntStream.range(0, td.partitions().size())
Expand All @@ -731,6 +793,7 @@ public Mono<Map<TopicPartition, List<ProducerState>>> getActiveProducersState(St
private Mono<Void> incrementalAlterConfig(String topicName,
List<ConfigEntry> currentConfigs,
Map<String, String> newConfigs) {
checkClosed();
var configsToDelete = currentConfigs.stream()
.filter(e -> e.source() == ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) //manually set configs only
.filter(e -> !newConfigs.containsKey(e.name()))
Expand All @@ -748,6 +811,7 @@ private Mono<Void> incrementalAlterConfig(String topicName,

@SuppressWarnings("deprecation")
private Mono<Void> alterConfig(String topicName, Map<String, String> configs) {
checkClosed();
List<ConfigEntry> configEntries = configs.entrySet().stream()
.flatMap(cfg -> Stream.of(new ConfigEntry(cfg.getKey(), cfg.getValue())))
.collect(toList());
Expand Down Expand Up @@ -800,8 +864,15 @@ private static <K, V> BiFunction<Map<K, V>, Map<K, V>, Map<K, V>> mapMerger() {
};
}

private void checkClosed() {
if (this.closedContext != null) {
log.error("AdminClient is already closed at:\n{}", closedContext);
}
}

@Override
public void close() {
this.closedContext = ClosedContext.capture();
client.close();
}
}
Loading