diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java index c05e5cfac0f..e7f2c07d9de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java @@ -82,6 +82,7 @@ public final class MetadataOperationContext> { public static void handleMetadataErrors(MetadataResponse response) { for (TopicMetadata tm : response.topicMetadata()) { + if (shouldRefreshMetadata(tm.error())) throw tm.error().exception(); for (PartitionMetadata pm : tm.partitionMetadata()) { if (shouldRefreshMetadata(pm.error)) { throw pm.error.exception(); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 28b46401e0b..7f308460440 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -342,11 +342,15 @@ public class KafkaAdminClientTest { } private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { + return prepareMetadataResponse(cluster, error, error); + } + + private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) { List metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { - PartitionMetadata pm = new PartitionMetadata(error, + PartitionMetadata pm = new PartitionMetadata(partitionError, new TopicPartition(topic, pInfo.partition()), Optional.of(pInfo.leader().id()), Optional.of(234), @@ -355,7 +359,7 @@ public class KafkaAdminClientTest { Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList())); pms.add(pm); } - TopicMetadata tm = new TopicMetadata(error, topic, false, pms); + TopicMetadata tm = new TopicMetadata(topicError, topic, false, pms); metadata.add(tm); } return MetadataResponse.prepareResponse(0, @@ -2342,6 +2346,39 @@ public class KafkaAdminClientTest { } } + @Test + public void testListOffsetsRetriableErrorOnMetadata() throws Exception { + Node node = new Node(0, "localhost", 8120); + List nodes = Collections.singletonList(node); + final Cluster cluster = new Cluster( + "mockClusterId", + nodes, + Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})), + Collections.emptySet(), + Collections.emptySet(), + node); + final TopicPartition tp0 = new TopicPartition("foo", 0); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE)); + // metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); + // listoffsets response from broker 0 + Map responseData = new HashMap<>(); + responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 123L, Optional.of(321))); + env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData)); + + ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latest())); + + Map offsets = result.all().get(3, TimeUnit.SECONDS); + assertEquals(1, offsets.size()); + assertEquals(123L, offsets.get(tp0).offset()); + assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue()); + assertEquals(-1L, offsets.get(tp0).timestamp()); + } + } + @Test public void testListOffsetsRetriableErrors() throws Exception {