diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java index 16af1966719..960d953b584 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java @@ -61,7 +61,7 @@ public final class ListOffsetsHandler extends Batched topicError.exception( + "Failed to fetch metadata for partition " + tp + " because metadata for topic `" + topic + "` could not be found")); + break; + } case LEADER_NOT_AVAILABLE: case BROKER_NOT_AVAILABLE: log.debug("Metadata request for topic {} returned topic-level error {}. Will retry", @@ -124,6 +137,7 @@ public class PartitionLeaderStrategy implements AdminApiLookupStrategy metadata = new ArrayList<>(); for (String topic : cluster.topics()) { List pms = new ArrayList<>(); for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { MetadataResponsePartition pm = new MetadataResponsePartition() - .setErrorCode(error.code()) + .setErrorCode(partitionError.code()) .setPartitionIndex(pInfo.partition()) .setLeaderId(pInfo.leader().id()) .setLeaderEpoch(234) @@ -573,7 +579,7 @@ public class KafkaAdminClientTest { pms.add(pm); } MetadataResponseTopic tm = new MetadataResponseTopic() - .setErrorCode(error.code()) + .setErrorCode(topicError.code()) .setName(topic) .setIsInternal(false) .setPartitions(pms); @@ -5462,7 +5468,6 @@ public class KafkaAdminClientTest { @Test public void testListOffsetsMetadataRetriableErrors() throws Exception { - Node node0 = new Node(0, "localhost", 8120); Node node1 = new Node(1, "localhost", 8121); List nodes = Arrays.asList(node0, node1); @@ -5485,7 +5490,8 @@ public class KafkaAdminClientTest { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.LEADER_NOT_AVAILABLE)); - env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION)); + // We retry when a partition of a topic (but not the topic itself) is unknown + env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE, Errors.UNKNOWN_TOPIC_OR_PARTITION)); env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE)); // listoffsets response from broker 0 @@ -5636,9 +5642,13 @@ public class KafkaAdminClientTest { } } - @Test - public void testListOffsetsMetadataNonRetriableErrors() throws Exception { - + @ParameterizedTest + @MethodSource("listOffsetsMetadataNonRetriableErrors") + public void testListOffsetsMetadataNonRetriableErrors( + Errors topicMetadataError, + Errors partitionMetadataError, + Class expectedFailure + ) throws Exception { Node node0 = new Node(0, "localhost", 8120); Node node1 = new Node(1, "localhost", 8121); List nodes = Arrays.asList(node0, node1); @@ -5654,20 +5664,51 @@ public class KafkaAdminClientTest { node0); final TopicPartition tp1 = new TopicPartition("foo", 0); + final MetadataResponse preparedResponse = prepareMetadataResponse( + cluster, topicMetadataError, partitionMetadataError + ); try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.TOPIC_AUTHORIZATION_FAILED)); + env.kafkaClient().prepareResponse(preparedResponse); Map partitions = new HashMap<>(); partitions.put(tp1, OffsetSpec.latest()); ListOffsetsResult result = env.adminClient().listOffsets(partitions); - TestUtils.assertFutureError(result.all(), TopicAuthorizationException.class); + TestUtils.assertFutureError(result.all(), expectedFailure); } } + private static Stream listOffsetsMetadataNonRetriableErrors() { + return Stream.of( + Arguments.of( + Errors.TOPIC_AUTHORIZATION_FAILED, + Errors.TOPIC_AUTHORIZATION_FAILED, + TopicAuthorizationException.class + ), + Arguments.of( + // We fail fast when the entire topic is unknown... + Errors.UNKNOWN_TOPIC_OR_PARTITION, + Errors.NONE, + UnknownTopicOrPartitionException.class + ), + Arguments.of( + // ... even if a partition in the topic is also somehow reported as unknown... + Errors.UNKNOWN_TOPIC_OR_PARTITION, + Errors.UNKNOWN_TOPIC_OR_PARTITION, + UnknownTopicOrPartitionException.class + ), + Arguments.of( + // ... or a partition in the topic has a different, otherwise-retriable error + Errors.UNKNOWN_TOPIC_OR_PARTITION, + Errors.LEADER_NOT_AVAILABLE, + UnknownTopicOrPartitionException.class + ) + ); + } + @Test public void testListOffsetsPartialResponse() throws Exception { Node node0 = new Node(0, "localhost", 8120);