KAFKA-15585: DescribeTopicPartitions client side change. (#15470)

Add the support for DescribeTopicPartitions API to AdminClient. For this initial implementation, we are simply loading all of the results into memory on the client side. 

Reviewers: Andrew Schofield <aschofield@confluent.io>, Kirk True <ktrue@confluent.io>, David Jacot <djacot@confluent.io>, Artem Livshits <alivshits@confluent.io>, David Arthur <mumrah@gmail.com>
This commit is contained in:
Calvin Liu 2024-04-18 09:09:14 -07:00 committed by GitHub
parent aee9724ee1
commit 53ff1a5a58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 617 additions and 97 deletions

View File

@ -30,6 +30,7 @@ import java.util.Collection;
public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions> {
private boolean includeAuthorizedOperations;
private int partitionSizeLimitPerResponse = 2000;
/**
* Set the timeout in milliseconds for this operation or {@code null} if the default api timeout for the
@ -47,8 +48,18 @@ public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions
return this;
}
// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config
// max.request.partition.size.limit on the server side.
public DescribeTopicsOptions partitionSizeLimitPerResponse(int partitionSizeLimitPerResponse) {
this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse;
return this;
}
public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}
public int partitionSizeLimitPerResponse() {
return partitionSizeLimitPerResponse;
}
}

View File

@ -35,9 +35,9 @@ import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec.TimestampSpec;
import org.apache.kafka.clients.admin.internals.AbortTransactionHandler;
import org.apache.kafka.clients.admin.internals.AdminApiDriver;
import org.apache.kafka.clients.admin.internals.AdminApiHandler;
import org.apache.kafka.clients.admin.internals.AdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiFuture.SimpleAdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiHandler;
import org.apache.kafka.clients.admin.internals.AdminBootstrapAddresses;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.AllBrokersStrategy;
@ -138,6 +138,11 @@ import org.apache.kafka.common.message.DescribeLogDirsRequestData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData.TopicRequest;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData.UserName;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
@ -202,11 +207,13 @@ import org.apache.kafka.common.requests.DescribeDelegationTokenRequest;
import org.apache.kafka.common.requests.DescribeDelegationTokenResponse;
import org.apache.kafka.common.requests.DescribeLogDirsRequest;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.requests.DescribeUserScramCredentialsRequest;
import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumRequest.Builder;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse;
import org.apache.kafka.common.requests.DescribeUserScramCredentialsRequest;
import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse;
import org.apache.kafka.common.requests.ElectLeadersRequest;
import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.ExpireDelegationTokenRequest;
@ -255,6 +262,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -263,6 +271,7 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -2110,27 +2119,18 @@ public class KafkaAdminClient extends AdminClient {
if (topics instanceof TopicIdCollection)
return DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) topics).topicIds(), options));
else if (topics instanceof TopicNameCollection)
return DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNames(((TopicNameCollection) topics).topicNames(), options));
return DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(((TopicNameCollection) topics).topicNames(), options));
else
throw new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for describeTopics.");
}
private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(final Collection<String> topicNames, DescribeTopicsOptions options) {
final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
final ArrayList<String> topicNamesList = new ArrayList<>();
for (String topicName : topicNames) {
if (topicNameIsUnrepresentable(topicName)) {
KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException("The given topic name '" +
topicName + "' cannot be represented in a request."));
topicFutures.put(topicName, future);
} else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<>());
topicNamesList.add(topicName);
}
}
final long now = time.milliseconds();
Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
Call generateDescribeTopicsCallWithMetadataApi(
List<String> topicNamesList,
Map<String, KafkaFutureImpl<TopicDescription>> topicFutures,
DescribeTopicsOptions options,
long now
) {
return new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
private boolean supportsDisablingTopicCreation = true;
@ -2185,9 +2185,155 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(topicFutures.values(), throwable);
}
};
if (!topicNamesList.isEmpty()) {
runnable.call(call, now);
}
Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi(
List<String> topicNamesList,
Map<String, KafkaFutureImpl<TopicDescription>> topicFutures,
Map<Integer, Node> nodes,
DescribeTopicsOptions options,
long now
) {
final Map<String, TopicRequest> topicsRequests = new LinkedHashMap<>();
topicNamesList.stream().sorted().forEach(topic -> {
topicsRequests.put(topic, new TopicRequest().setName(topic));
});
return new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
TopicDescription partiallyFinishedTopicDescription = null;
@Override
DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) {
DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData()
.setTopics(new ArrayList<>(topicsRequests.values()))
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
if (partiallyFinishedTopicDescription != null) {
// If the previous cursor points to partition 0, it will not be set here. Instead, the previous
// cursor topic will be the first topic in the request.
request.setCursor(new DescribeTopicPartitionsRequestData.Cursor()
.setTopicName(partiallyFinishedTopicDescription.name())
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
);
}
return new DescribeTopicPartitionsRequest.Builder(request);
}
@SuppressWarnings("NPathComplexity")
@Override
void handleResponse(AbstractResponse abstractResponse) {
DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse;
DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor();
// The topicDescription for the cursor topic of the current batch.
TopicDescription nextTopicDescription = null;
for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) {
String topicName = topic.name();
Errors error = Errors.forCode(topic.errorCode());
KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName);
if (error != Errors.NONE) {
future.completeExceptionally(error.exception());
topicsRequests.remove(topicName);
if (responseCursor != null && responseCursor.topicName().equals(topicName)) {
responseCursor = null;
}
continue;
}
TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) {
// Add the partitions for the cursor topic of the previous batch.
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
continue;
}
if (responseCursor != null && responseCursor.topicName().equals(topicName)) {
// In the same batch of result, it may need to handle the partitions for the previous cursor
// topic and the current cursor topic. Cache the result in the nextTopicDescription.
nextTopicDescription = currentTopicDescription;
continue;
}
topicsRequests.remove(topicName);
future.complete(currentTopicDescription);
}
if (partiallyFinishedTopicDescription != null &&
(responseCursor == null || !responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) {
// We can't simply check nextTopicDescription != null here to close the partiallyFinishedTopicDescription.
// Because the responseCursor topic may not show in the response.
String topicName = partiallyFinishedTopicDescription.name();
topicFutures.get(topicName).complete(partiallyFinishedTopicDescription);
topicsRequests.remove(topicName);
partiallyFinishedTopicDescription = null;
}
if (nextTopicDescription != null) {
partiallyFinishedTopicDescription = nextTopicDescription;
}
if (!topicsRequests.isEmpty()) {
runnable.call(this, time.milliseconds());
}
}
@Override
boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
final long now = time.milliseconds();
log.warn("The DescribeTopicPartitions API is not supported, using Metadata API to describe topics.");
runnable.call(generateDescribeTopicsCallWithMetadataApi(topicNamesList, topicFutures, options, now), now);
return false;
}
@Override
void handleFailure(Throwable throwable) {
if (!(throwable instanceof UnsupportedVersionException)) {
completeAllExceptionally(topicFutures.values(), throwable);
}
}
};
}
private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
final Collection<String> topicNames,
DescribeTopicsOptions options
) {
final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
final ArrayList<String> topicNamesList = new ArrayList<>();
for (String topicName : topicNames) {
if (topicNameIsUnrepresentable(topicName)) {
KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>();
future.completeExceptionally(new InvalidTopicException("The given topic name '" +
topicName + "' cannot be represented in a request."));
topicFutures.put(topicName, future);
} else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<>());
topicNamesList.add(topicName);
}
}
if (topicNamesList.isEmpty()) {
return new HashMap<>(topicFutures);
}
// First, we need to retrieve the node info.
DescribeClusterResult clusterResult = describeCluster();
Map<Integer, Node> nodes;
try {
nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node));
} catch (InterruptedException | ExecutionException e) {
completeAllExceptionally(topicFutures.values(), e.getCause());
return new HashMap<>(topicFutures);
}
final long now = time.milliseconds();
runnable.call(
generateDescribeTopicsCallWithDescribeTopicPartitionsApi(topicNamesList, topicFutures, nodes, options, now),
now
);
return new HashMap<>(topicFutures);
}
@ -2256,6 +2402,18 @@ public class KafkaAdminClient extends AdminClient {
return new HashMap<>(topicFutures);
}
private TopicDescription getTopicDescriptionFromDescribeTopicsResponseTopic(
DescribeTopicPartitionsResponseTopic topic,
Map<Integer, Node> nodes
) {
List<DescribeTopicPartitionsResponsePartition> partitionInfos = topic.partitions();
List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size());
for (DescribeTopicPartitionsResponsePartition partitionInfo : partitionInfos) {
partitions.add(DescribeTopicPartitionsResponse.partitionToTopicPartitionInfo(partitionInfo, nodes));
}
return new TopicDescription(topic.name(), topic.isInternal(), partitions, validAclOperations(topic.topicAuthorizedOperations()), topic.topicId());
}
private TopicDescription getTopicDescriptionFromCluster(Cluster cluster, String topicName, Uuid topicId,
Integer authorizedOperations) {
boolean isInternal = cluster.internalTopics().contains(topicName);

View File

@ -31,6 +31,8 @@ public class TopicPartitionInfo {
private final Node leader;
private final List<Node> replicas;
private final List<Node> isr;
private final List<Node> elr;
private final List<Node> lastKnownElr;
/**
* Create an instance of this class with the provided parameters.
@ -40,12 +42,32 @@ public class TopicPartitionInfo {
* @param replicas the replicas of the partition in the same order as the replica assignment (the preferred replica
* is the head of the list)
* @param isr the in-sync replicas
* @param elr the eligible leader replicas
* @param lastKnownElr the last known eligible leader replicas.
*/
public TopicPartitionInfo(
int partition,
Node leader,
List<Node> replicas,
List<Node> isr,
List<Node> elr,
List<Node> lastKnownElr
) {
this.partition = partition;
this.leader = leader;
this.replicas = Collections.unmodifiableList(replicas);
this.isr = Collections.unmodifiableList(isr);
this.elr = Collections.unmodifiableList(elr);
this.lastKnownElr = Collections.unmodifiableList(lastKnownElr);
}
public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
this.partition = partition;
this.leader = leader;
this.replicas = Collections.unmodifiableList(replicas);
this.isr = Collections.unmodifiableList(isr);
this.elr = null;
this.lastKnownElr = null;
}
/**
@ -79,9 +101,26 @@ public class TopicPartitionInfo {
return isr;
}
/**
* Return the eligible leader replicas of the partition. Note that the ordering of the result is unspecified.
*/
public List<Node> elr() {
return elr;
}
/**
* Return the last known eligible leader replicas of the partition. Note that the ordering of the result is unspecified.
*/
public List<Node> lastKnownElr() {
return lastKnownElr;
}
public String toString() {
String elrString = elr != null ? Utils.join(elr, ", ") : "N/A";
String lastKnownElrString = lastKnownElr != null ? Utils.join(lastKnownElr, ", ") : "N/A";
return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")";
Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
", elr=" + elrString + ", lastKnownElr=" + lastKnownElrString + ")";
}
@Override
@ -94,7 +133,9 @@ public class TopicPartitionInfo {
return partition == that.partition &&
Objects.equals(leader, that.leader) &&
Objects.equals(replicas, that.replicas) &&
Objects.equals(isr, that.isr);
Objects.equals(isr, that.isr) &&
Objects.equals(elr, that.elr) &&
Objects.equals(lastKnownElr, that.lastKnownElr);
}
@Override
@ -103,6 +144,8 @@ public class TopicPartitionInfo {
result = 31 * result + (leader != null ? leader.hashCode() : 0);
result = 31 * result + (replicas != null ? replicas.hashCode() : 0);
result = 31 * result + (isr != null ? isr.hashCode() : 0);
result = 31 * result + (elr != null ? elr.hashCode() : 0);
result = 31 * result + (lastKnownElr != null ? lastKnownElr.hashCode() : 0);
return result;
}
}

View File

@ -17,6 +17,8 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
@ -27,6 +29,7 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class DescribeTopicPartitionsResponse extends AbstractResponse {
private final DescribeTopicPartitionsResponseData data;
@ -80,4 +83,16 @@ public class DescribeTopicPartitionsResponse extends AbstractResponse {
return new DescribeTopicPartitionsResponse(
new DescribeTopicPartitionsResponseData(new ByteBufferAccessor(buffer), version));
}
public static TopicPartitionInfo partitionToTopicPartitionInfo(
DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition partition,
Map<Integer, Node> nodes) {
return new TopicPartitionInfo(
partition.partitionIndex(),
nodes.get(partition.leaderId()),
partition.replicaNodes().stream().map(id -> nodes.getOrDefault(id, new Node(id, "", -1))).collect(Collectors.toList()),
partition.isrNodes().stream().map(id -> nodes.getOrDefault(id, new Node(id, "", -1))).collect(Collectors.toList()),
partition.eligibleLeaderReplicas().stream().map(id -> nodes.getOrDefault(id, new Node(id, "", -1))).collect(Collectors.toList()),
partition.lastKnownElr().stream().map(id -> nodes.getOrDefault(id, new Node(id, "", -1))).collect(Collectors.toList()));
}
}

View File

@ -105,6 +105,10 @@ import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic;
import org.apache.kafka.common.message.DescribeProducersResponseData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo;
@ -177,6 +181,7 @@ import org.apache.kafka.common.requests.DescribeProducersRequest;
import org.apache.kafka.common.requests.DescribeProducersResponse;
import org.apache.kafka.common.requests.DescribeQuorumRequest;
import org.apache.kafka.common.requests.DescribeQuorumResponse;
import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse;
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
import org.apache.kafka.common.requests.DescribeUserScramCredentialsResponse;
@ -1401,42 +1406,235 @@ public class KafkaAdminClientTest {
}
}
@SuppressWarnings("NPathComplexity")
@Test
public void testMetadataRetries() throws Exception {
// We should continue retrying on metadata update failures in spite of retry configuration
public void testDescribeTopicsWithDescribeTopicPartitionsApiBasic() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
String topicName0 = "test-0";
String topicName1 = "test-1";
Map<String, Uuid> topics = new HashMap<>();
topics.put(topicName0, Uuid.randomUuid());
topics.put(topicName1, Uuid.randomUuid());
String topic = "topic";
Cluster bootstrapCluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 9999)));
Cluster initializedCluster = mockCluster(3, 0);
env.kafkaClient().prepareResponse(
prepareDescribeClusterResponse(0,
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)
);
try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, bootstrapCluster,
newStrMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "10000000",
AdminClientConfig.RETRIES_CONFIG, "0"))) {
DescribeTopicPartitionsResponseData dataFirstPart = new DescribeTopicPartitionsResponseData();
addPartitionToDescribeTopicPartitionsResponse(dataFirstPart, topicName0, topics.get(topicName0), Arrays.asList(0));
dataFirstPart.setNextCursor(new DescribeTopicPartitionsResponseData.Cursor()
.setTopicName(topicName0)
.setPartitionIndex(1));
env.kafkaClient().prepareResponse(body -> {
DescribeTopicPartitionsRequestData request = (DescribeTopicPartitionsRequestData) body.data();
if (request.topics().size() != 2) return false;
if (!request.topics().get(0).name().equals(topicName0)) return false;
if (!request.topics().get(1).name().equals(topicName1)) return false;
if (request.cursor() != null) return false;
return true;
}, new DescribeTopicPartitionsResponse(dataFirstPart));
// The first request fails with a disconnect
env.kafkaClient().prepareResponse(null, true);
DescribeTopicPartitionsResponseData dataSecondPart = new DescribeTopicPartitionsResponseData();
addPartitionToDescribeTopicPartitionsResponse(dataSecondPart, topicName0, topics.get(topicName0), Arrays.asList(1));
addPartitionToDescribeTopicPartitionsResponse(dataSecondPart, topicName1, topics.get(topicName1), Arrays.asList(0));
env.kafkaClient().prepareResponse(body -> {
DescribeTopicPartitionsRequestData request = (DescribeTopicPartitionsRequestData) body.data();
if (request.topics().size() != 2) return false;
if (!request.topics().get(0).name().equals(topicName0)) return false;
if (!request.topics().get(1).name().equals(topicName1)) return false;
// The next one succeeds and gives us the controller id
env.kafkaClient().prepareResponse(RequestTestUtils.metadataResponse(initializedCluster.nodes(),
initializedCluster.clusterResource().clusterId(),
initializedCluster.controller().id(),
Collections.emptyList()));
DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
if (cursor == null || cursor.topicName() != topicName0 || cursor.partitionIndex() != 1) return false;
// Then we respond to the DescribeTopic request
Node leader = initializedCluster.nodes().get(0);
MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(
Errors.NONE, new TopicPartition(topic, 0), Optional.of(leader.id()), Optional.of(10),
singletonList(leader.id()), singletonList(leader.id()), singletonList(leader.id()));
env.kafkaClient().prepareResponse(RequestTestUtils.metadataResponse(initializedCluster.nodes(),
initializedCluster.clusterResource().clusterId(), 1,
singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, topic, Uuid.ZERO_UUID, false,
singletonList(partitionMetadata), MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED))));
return true;
}, new DescribeTopicPartitionsResponse(dataSecondPart));
try {
DescribeTopicsResult result = env.adminClient().describeTopics(
Arrays.asList(topicName0, topicName1), new DescribeTopicsOptions()
);
Map<String, TopicDescription> topicDescriptions = result.allTopicNames().get();
assertEquals(2, topicDescriptions.size());
TopicDescription topicDescription = topicDescriptions.get(topicName0);
assertEquals(2, topicDescription.partitions().size());
assertEquals(0, topicDescription.partitions().get(0).partition());
assertEquals(1, topicDescription.partitions().get(1).partition());
topicDescription = topicDescriptions.get(topicName1);
assertEquals(1, topicDescription.partitions().size());
} catch (Exception e) {
fail("describe using DescribeTopics API should not fail", e);
}
}
}
DescribeTopicsResult result = env.adminClient().describeTopics(singleton(topic));
Map<String, TopicDescription> topicDescriptions = result.allTopicNames().get();
assertEquals(leader, topicDescriptions.get(topic).partitions().get(0).leader());
assertNull(topicDescriptions.get(topic).authorizedOperations());
@SuppressWarnings({"NPathComplexity", "CyclomaticComplexity"})
@Test
public void testDescribeTopicsWithDescribeTopicPartitionsApiEdgeCase() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
String topicName0 = "test-0";
String topicName1 = "test-1";
String topicName2 = "test-2";
Map<String, Uuid> topics = new HashMap<>();
topics.put(topicName0, Uuid.randomUuid());
topics.put(topicName1, Uuid.randomUuid());
topics.put(topicName2, Uuid.randomUuid());
env.kafkaClient().prepareResponse(
prepareDescribeClusterResponse(0,
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)
);
DescribeTopicPartitionsResponseData dataFirstPart = new DescribeTopicPartitionsResponseData();
addPartitionToDescribeTopicPartitionsResponse(dataFirstPart, topicName0, topics.get(topicName0), Arrays.asList(0));
addPartitionToDescribeTopicPartitionsResponse(dataFirstPart, topicName1, topics.get(topicName1), Arrays.asList(0));
dataFirstPart.setNextCursor(new DescribeTopicPartitionsResponseData.Cursor()
.setTopicName(topicName1)
.setPartitionIndex(1));
env.kafkaClient().prepareResponse(body -> {
DescribeTopicPartitionsRequestData request = (DescribeTopicPartitionsRequestData) body.data();
if (request.topics().size() != 3) return false;
if (!request.topics().get(0).name().equals(topicName0)) return false;
if (!request.topics().get(1).name().equals(topicName1)) return false;
if (!request.topics().get(2).name().equals(topicName2)) return false;
if (request.cursor() != null) return false;
return true;
}, new DescribeTopicPartitionsResponse(dataFirstPart));
DescribeTopicPartitionsResponseData dataSecondPart = new DescribeTopicPartitionsResponseData();
addPartitionToDescribeTopicPartitionsResponse(dataSecondPart, topicName1, topics.get(topicName1), Arrays.asList(1));
addPartitionToDescribeTopicPartitionsResponse(dataSecondPart, topicName2, topics.get(topicName2), Arrays.asList(0));
dataSecondPart.setNextCursor(new DescribeTopicPartitionsResponseData.Cursor()
.setTopicName(topicName2)
.setPartitionIndex(1));
env.kafkaClient().prepareResponse(body -> {
DescribeTopicPartitionsRequestData request = (DescribeTopicPartitionsRequestData) body.data();
if (request.topics().size() != 2) return false;
if (!request.topics().get(0).name().equals(topicName1)) return false;
if (!request.topics().get(1).name().equals(topicName2)) return false;
DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
if (cursor == null || !cursor.topicName().equals(topicName1) || cursor.partitionIndex() != 1) return false;
return true;
}, new DescribeTopicPartitionsResponse(dataSecondPart));
DescribeTopicPartitionsResponseData dataThirdPart = new DescribeTopicPartitionsResponseData();
addPartitionToDescribeTopicPartitionsResponse(dataThirdPart, topicName2, topics.get(topicName2), Arrays.asList(1));
env.kafkaClient().prepareResponse(body -> {
DescribeTopicPartitionsRequestData request = (DescribeTopicPartitionsRequestData) body.data();
if (request.topics().size() != 1) return false;
if (!request.topics().get(0).name().equals(topicName2)) return false;
DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
if (cursor == null || !cursor.topicName().equals(topicName2) || cursor.partitionIndex() != 1) return false;
return true;
}, new DescribeTopicPartitionsResponse(dataThirdPart));
try {
DescribeTopicsResult result = env.adminClient().describeTopics(
Arrays.asList(topicName1, topicName0, topicName2), new DescribeTopicsOptions()
);
Map<String, TopicDescription> topicDescriptions = result.allTopicNames().get();
assertEquals(3, topicDescriptions.size());
TopicDescription topicDescription = topicDescriptions.get(topicName0);
assertEquals(1, topicDescription.partitions().size());
assertEquals(0, topicDescription.partitions().get(0).partition());
topicDescription = topicDescriptions.get(topicName1);
assertEquals(2, topicDescription.partitions().size());
topicDescription = topicDescriptions.get(topicName2);
assertEquals(2, topicDescription.partitions().size());
} catch (Exception e) {
fail("describe using DescribeTopics API should not fail", e);
}
}
}
private void addPartitionToDescribeTopicPartitionsResponse(
DescribeTopicPartitionsResponseData data,
String topicName,
Uuid topicId,
List<Integer> partitions) {
List<DescribeTopicPartitionsResponsePartition> addingPartitions = new ArrayList<>();
partitions.forEach(partition -> {
addingPartitions.add(new DescribeTopicPartitionsResponsePartition()
.setIsrNodes(Arrays.asList(0))
.setErrorCode((short) 0)
.setLeaderEpoch(0)
.setLeaderId(0)
.setEligibleLeaderReplicas(Arrays.asList(1))
.setLastKnownElr(Arrays.asList(2))
.setPartitionIndex(partition)
.setReplicaNodes(Arrays.asList(0, 1, 2)));
});
data.topics().add(new DescribeTopicPartitionsResponseTopic()
.setErrorCode((short) 0)
.setTopicId(topicId)
.setName(topicName)
.setIsInternal(false)
.setPartitions(addingPartitions));
}
@SuppressWarnings("NPathComplexity")
@Test
public void testDescribeTopicsWithDescribeTopicPartitionsApiErrorHandling() {
try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
String topicName0 = "test-0";
String topicName1 = "test-1";
Map<String, Uuid> topics = new HashMap<>();
topics.put(topicName0, Uuid.randomUuid());
topics.put(topicName1, Uuid.randomUuid());
env.kafkaClient().prepareResponse(
prepareDescribeClusterResponse(0,
env.cluster().nodes(),
env.cluster().clusterResource().clusterId(),
2,
MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)
);
DescribeTopicPartitionsResponseData dataFirstPart = new DescribeTopicPartitionsResponseData();
dataFirstPart.topics().add(new DescribeTopicPartitionsResponseTopic()
.setErrorCode((short) 0)
.setTopicId(topics.get(topicName0))
.setName(topicName0)
.setIsInternal(false)
.setPartitions(Arrays.asList(new DescribeTopicPartitionsResponsePartition()
.setIsrNodes(Arrays.asList(0))
.setErrorCode((short) 0)
.setLeaderEpoch(0)
.setLeaderId(0)
.setEligibleLeaderReplicas(Arrays.asList(1))
.setLastKnownElr(Arrays.asList(2))
.setPartitionIndex(0)
.setReplicaNodes(Arrays.asList(0, 1, 2))))
);
dataFirstPart.topics().add(new DescribeTopicPartitionsResponseTopic()
.setErrorCode((short) 29)
.setTopicId(Uuid.ZERO_UUID)
.setName(topicName1)
.setIsInternal(false)
);
env.kafkaClient().prepareResponse(body -> {
DescribeTopicPartitionsRequestData request = (DescribeTopicPartitionsRequestData) body.data();
if (request.topics().size() != 2) return false;
if (!request.topics().get(0).name().equals(topicName0)) return false;
if (!request.topics().get(1).name().equals(topicName1)) return false;
if (request.cursor() != null) return false;
return true;
}, new DescribeTopicPartitionsResponse(dataFirstPart));
DescribeTopicsResult result = env.adminClient().describeTopics(
Arrays.asList(topicName1, topicName0), new DescribeTopicsOptions()
);
try {
TestUtils.assertFutureError(result.allTopicNames(), TopicAuthorizationException.class);
} catch (Exception e) {
fail("describe using DescribeTopics API should not have other exceptions", e);
}
}
}

View File

@ -394,7 +394,7 @@ public class MockAdminClient extends AdminClient {
// Partitions start off on the first log directory of each broker, for now.
List<String> logDirs = new ArrayList<>(numberOfPartitions);
for (int i = 0; i < numberOfPartitions; i++) {
partitions.add(new TopicPartitionInfo(i, brokers.get(0), replicas, Collections.emptyList()));
partitions.add(new TopicPartitionInfo(i, brokers.get(0), replicas, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
logDirs.add(brokerLogDirs.get(partitions.get(i).leader().id()).get(0));
}
Uuid topicId = Uuid.randomUuid();

View File

@ -30,6 +30,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
@ -39,15 +40,18 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.DescribeClusterResponseData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DescribeClusterResponse;
import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeTopicPartitionsResponse;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.MockTime;
@ -78,6 +82,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@SuppressWarnings("ClassDataAbstractionCoupling")
public class TopicAdminTest {
/**
@ -96,25 +101,7 @@ public class TopicAdminTest {
assertTrue(admin.createOrFindTopics(newTopic).isEmpty());
}
}
/**
* 0.11.0.0 clients can talk with older brokers, but the DESCRIBE_TOPIC API was added in 0.10.0.0. That means,
* if our TopicAdmin talks to a pre 0.10.0 broker, it should receive an UnsupportedVersionException, should
* create no topics, and return false.
*/
@Test
public void throwsWithApiVersionMismatchOnDescribe() {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(describeTopicResponseWithUnsupportedVersion(newTopic));
TopicAdmin admin = new TopicAdmin(env.adminClient());
Exception e = assertThrows(ConnectException.class, () -> admin.describeTopics(newTopic.name()));
assertInstanceOf(UnsupportedVersionException.class, e.getCause());
}
}
@Test
public void returnEmptyWithClusterAuthorizationFailureOnCreate() {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
@ -134,6 +121,7 @@ public class TopicAdminTest {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
env.kafkaClient().prepareResponse(describeClusterResponse(cluster));
env.kafkaClient().prepareResponse(describeTopicResponseWithClusterAuthorizationException(newTopic));
TopicAdmin admin = new TopicAdmin(env.adminClient());
Exception e = assertThrows(ConnectException.class, () -> admin.describeTopics(newTopic.name()));
@ -160,6 +148,7 @@ public class TopicAdminTest {
final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
Cluster cluster = createCluster(1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(), cluster)) {
env.kafkaClient().prepareResponse(describeClusterResponse(cluster));
env.kafkaClient().prepareResponse(describeTopicResponseWithTopicAuthorizationException(newTopic));
TopicAdmin admin = new TopicAdmin(env.adminClient());
Exception e = assertThrows(ConnectException.class, () -> admin.describeTopics(newTopic.name()));
@ -912,27 +901,44 @@ public class TopicAdminTest {
return byName.get(topicName).get();
}
private MetadataResponse describeTopicResponseWithUnsupportedVersion(NewTopic... topics) {
return describeTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics);
}
private MetadataResponse describeTopicResponseWithClusterAuthorizationException(NewTopic... topics) {
private DescribeTopicPartitionsResponse describeTopicResponseWithClusterAuthorizationException(NewTopic... topics) {
return describeTopicResponse(new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
}
private MetadataResponse describeTopicResponseWithTopicAuthorizationException(NewTopic... topics) {
private DescribeTopicPartitionsResponse describeTopicResponseWithTopicAuthorizationException(NewTopic... topics) {
return describeTopicResponse(new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, "Not authorized to create topic(s)"), topics);
}
private MetadataResponse describeTopicResponse(ApiError error, NewTopic... topics) {
private DescribeTopicPartitionsResponse describeTopicResponse(ApiError error, NewTopic... topics) {
if (error == null) error = new ApiError(Errors.NONE, "");
MetadataResponseData response = new MetadataResponseData();
DescribeTopicPartitionsResponseData response = new DescribeTopicPartitionsResponseData();
for (NewTopic topic : topics) {
response.topics().add(new MetadataResponseTopic()
.setName(topic.name())
.setErrorCode(error.error().code()));
response.topics().add(new DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic()
.setErrorCode(error.error().code())
.setTopicId(Uuid.ZERO_UUID)
.setName(topic.name())
.setIsInternal(false)
);
}
return new MetadataResponse(response, ApiKeys.METADATA.latestVersion());
return new DescribeTopicPartitionsResponse(response);
}
private DescribeClusterResponse describeClusterResponse(Cluster cluster) {
DescribeClusterResponseData data = new DescribeClusterResponseData()
.setErrorCode(Errors.NONE.code())
.setThrottleTimeMs(0)
.setControllerId(cluster.nodes().get(0).id())
.setClusterId(cluster.clusterResource().clusterId())
.setClusterAuthorizedOperations(MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
cluster.nodes().forEach(broker ->
data.brokers().add(new DescribeClusterResponseData.DescribeClusterBroker()
.setHost(broker.host())
.setPort(broker.port())
.setBrokerId(broker.id())
.setRack(broker.rack())));
return new DescribeClusterResponse(data);
}
private DescribeConfigsResponse describeConfigsResponseWithUnsupportedVersion(NewTopic... topics) {

View File

@ -82,6 +82,11 @@ public class DescribeTopicPartitionsRequestHandler {
}
}
if (cursor != null && cursor.partitionIndex() < 0) {
// The partition id in cursor must be valid.
throw new InvalidRequestException("DescribeTopicPartitionsRequest cursor partition must be valid: " + cursor);
}
// Do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not
Set<DescribeTopicPartitionsResponseTopic> unauthorizedForDescribeTopicMetadata = new HashSet<>();
@ -101,7 +106,7 @@ public class DescribeTopicPartitionsRequestHandler {
JavaConverters.asScalaIterator(authorizedTopicsStream.iterator()),
abstractRequest.context().listenerName,
(String topicName) -> topicName.equals(cursorTopicName) ? cursor.partitionIndex() : 0,
Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()),
Math.max(Math.min(config.maxRequestPartitionSizeLimit(), request.responsePartitionLimit()), 1),
fetchAllTopics
);

View File

@ -1426,7 +1426,9 @@ class KafkaApis(val requestChannel: RequestChannel,
new DescribeTopicPartitionsResponse(response)
})
}
case None => throw new InvalidRequestException("ZK cluster does not handle DescribeTopicPartitions request")
case None => {
requestHelper.sendMaybeThrottle(request, request.body[DescribeTopicPartitionsRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
}
}
}

View File

@ -472,6 +472,21 @@ class DescribeTopicPartitionsRequestHandlerTest {
} catch (Exception e) {
assertInstanceOf(InvalidRequestException.class, e, e.getMessage());
}
// 3.4 With cursor point to a negative partition id. Exception should be thrown if not querying all the topics.
describeTopicPartitionsRequest = new DescribeTopicPartitionsRequest(new DescribeTopicPartitionsRequestData()
.setTopics(Arrays.asList(
new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic),
new DescribeTopicPartitionsRequestData.TopicRequest().setName(authorizedTopic2)
))
.setCursor(new DescribeTopicPartitionsRequestData.Cursor().setTopicName(authorizedTopic).setPartitionIndex(-1))
);
try {
handler.handleDescribeTopicPartitionsRequest(buildRequest(describeTopicPartitionsRequest, plaintextListener));
} catch (Exception e) {
assertTrue(e instanceof InvalidRequestException, e.getMessage());
}
}
void updateKraftMetadataCache(KRaftMetadataCache kRaftMetadataCache, List<ApiMessage> records) {

View File

@ -708,22 +708,22 @@ public class InternalTopicManagerTest {
assertEquals(mkSet(topic1, topic2, topic3, topic4), mockAdminClient.listTopics().names().get());
assertEquals(new TopicDescription(topic1, false, new ArrayList<TopicPartitionInfo>() {
{
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}
}), mockAdminClient.describeTopics(Collections.singleton(topic1)).topicNameValues().get(topic1).get());
assertEquals(new TopicDescription(topic2, false, new ArrayList<TopicPartitionInfo>() {
{
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}
}), mockAdminClient.describeTopics(Collections.singleton(topic2)).topicNameValues().get(topic2).get());
assertEquals(new TopicDescription(topic3, false, new ArrayList<TopicPartitionInfo>() {
{
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}
}), mockAdminClient.describeTopics(Collections.singleton(topic3)).topicNameValues().get(topic3).get());
assertEquals(new TopicDescription(topic4, false, new ArrayList<TopicPartitionInfo>() {
{
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()));
add(new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}
}), mockAdminClient.describeTopics(Collections.singleton(topic4)).topicNameValues().get(topic4).get());

View File

@ -28,6 +28,7 @@ import org.apache.kafka.clients.admin.CreatePartitionsOptions;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DescribeTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
@ -367,6 +368,22 @@ public abstract class TopicCommand {
.map(node -> node.toString())
.collect(Collectors.joining(",")));
}
if (info.elr() != null) {
System.out.print("\tElr: " + info.elr().stream()
.map(node -> Integer.toString(node.id()))
.collect(Collectors.joining(",")));
} else {
System.out.print("\tElr: N/A");
}
if (info.lastKnownElr() != null) {
System.out.print("\tLastKnownElr: " + info.lastKnownElr().stream()
.map(node -> Integer.toString(node.id()))
.collect(Collectors.joining(",")));
} else {
System.out.print("\tLastKnownElr: N/A");
}
System.out.print(markedForDeletion ? "\tMarkedForDeletion: true" : "");
System.out.println();
}
@ -568,7 +585,9 @@ public abstract class TopicCommand {
if (!topics.isEmpty()) {
Map<String, org.apache.kafka.clients.admin.TopicDescription> descTopics =
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
new DescribeTopicsOptions()
.partitionSizeLimitPerResponse(opts.partitionSizeLimitPerResponse().orElse(2000))).allTopicNames().get();
topicDescriptions = new ArrayList<>(descTopics.values());
}
@ -716,6 +735,8 @@ public abstract class TopicCommand {
private final OptionSpecBuilder excludeInternalTopicOpt;
private final ArgumentAcceptingOptionSpec<Integer> partitionSizeLimitPerResponseOpt;
private final Set<OptionSpec<?>> allTopicLevelOpts;
private final Set<OptionSpecBuilder> allReplicationReportOpts;
@ -799,6 +820,11 @@ public abstract class TopicCommand {
"if set when creating topics, the action will only execute if the topic does not already exist.");
excludeInternalTopicOpt = parser.accepts("exclude-internal",
"exclude internal topics when running list or describe command. The internal topics will be listed by default");
partitionSizeLimitPerResponseOpt = parser.accepts("partition-size-limit-per-response",
"the maximum partition size to be included in one DescribeTopicPartitions response.")
.withRequiredArg()
.describedAs("maximum number of partitions in one response.")
.ofType(java.lang.Integer.class);
options = parser.parse(args);
allTopicLevelOpts = new HashSet<>(Arrays.asList(alterOpt, createOpt, describeOpt, listOpt, deleteOpt));
@ -918,6 +944,10 @@ public abstract class TopicCommand {
return has(excludeInternalTopicOpt);
}
public Optional<Integer> partitionSizeLimitPerResponse() {
return valueAsOption(partitionSizeLimitPerResponseOpt);
}
public Optional<List<String>> topicConfig() {
return valuesAsOption(configOpt);
}

View File

@ -52,8 +52,8 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.collection.JavaConverters;
import scala.collection.mutable.Buffer;
import scala.collection.Seq;
import scala.collection.mutable.Buffer;
import java.util.ArrayList;
import java.util.Arrays;
@ -638,6 +638,33 @@ public class TopicCommandIntegrationTest extends kafka.integration.KafkaServerTe
assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + rows[0]);
}
@ParameterizedTest
@ValueSource(strings = {"quorum=zk", "quorum=kraft"})
public void testDescribeWithDescribeTopicPartitionsApi(String quorum) throws ExecutionException, InterruptedException {
TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 20, (short) 2,
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
);
TestUtils.createTopicWithAdmin(adminClient, "test-2", scalaBrokers, scalaControllers, 41, (short) 2,
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
);
TestUtils.createTopicWithAdmin(adminClient, "test-3", scalaBrokers, scalaControllers, 5, (short) 2,
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
);
TestUtils.createTopicWithAdmin(adminClient, "test-4", scalaBrokers, scalaControllers, 5, (short) 2,
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
);
TestUtils.createTopicWithAdmin(adminClient, "test-5", scalaBrokers, scalaControllers, 100, (short) 2,
scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
);
String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap(
"--describe", "--partition-size-limit-per-response=20"));
String[] rows = output.split("\n");
assertEquals(176, rows.length, String.join("\n", rows));
assertTrue(rows[2].contains("\tElr"), rows[2]);
assertTrue(rows[2].contains("LastKnownElr"), rows[2]);
}
@ParameterizedTest
@ValueSource(strings = {"zk", "kraft"})
public void testDescribeWhenTopicDoesntExist(String quorum) {

View File

@ -173,6 +173,16 @@ public class TopicCommandTest {
assertEquals(topicName, opts.topic().get());
}
@Test
public void testDescribeWithDescribeTopicsApiShouldSucceed() {
TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer,
"--describe",
"--topic", topicName});
assertTrue(opts.hasDescribeOption());
assertEquals(topicName, opts.topic().get());
}
@Test
public void testParseAssignmentDuplicateEntries() {

View File

@ -79,7 +79,7 @@ public class WorkerUtilsTest {
new TopicDescription(
TEST_TOPIC, false,
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))),
new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))),
adminClient.describeTopics(
Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
);
@ -96,7 +96,7 @@ public class WorkerUtilsTest {
new TopicDescription(
TEST_TOPIC, false,
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))),
new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))),
adminClient.describeTopics(
Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
);
@ -176,7 +176,7 @@ public class WorkerUtilsTest {
new TopicDescription(
TEST_TOPIC, false,
Collections.singletonList(
new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList()))),
new TopicPartitionInfo(0, broker1, singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))),
adminClient.describeTopics(Collections.singleton(TEST_TOPIC)).topicNameValues().get(TEST_TOPIC).get()
);
}