From f40aa33de792f74c3bc8078c61d5cac67df963ba Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 13 Jul 2020 17:00:51 +0200 Subject: [PATCH] MINOR; KafkaAdminClient#describeLogDirs should not fail all the futures when only one call fails (#8998) Reviewers: Colin P. McCabe --- .../kafka/clients/admin/KafkaAdminClient.java | 12 ++++---- .../clients/admin/KafkaAdminClientTest.java | 30 +++++++++++++++++++ 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 101a6e9076e..8bd8142aff8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2272,12 +2272,11 @@ public class KafkaAdminClient extends AdminClient { public DescribeLogDirsResult describeLogDirs(Collection brokers, DescribeLogDirsOptions options) { final Map>> futures = new HashMap<>(brokers.size()); - for (Integer brokerId: brokers) { - futures.put(brokerId, new KafkaFutureImpl<>()); - } - final long now = time.milliseconds(); - for (final Integer brokerId: brokers) { + for (final Integer brokerId : brokers) { + KafkaFutureImpl> future = new KafkaFutureImpl<>(); + futures.put(brokerId, future); + runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(brokerId)) { @@ -2290,7 +2289,6 @@ public class KafkaAdminClient extends AdminClient { @Override public void handleResponse(AbstractResponse abstractResponse) { DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; - KafkaFutureImpl> future = futures.get(brokerId); if (response.logDirInfos().size() > 0) { future.complete(response.logDirInfos()); } else { @@ -2300,7 +2298,7 @@ public class KafkaAdminClient extends AdminClient { } @Override void handleFailure(Throwable throwable) { - completeAllExceptionally(futures.values(), throwable); + future.completeExceptionally(throwable); } }, now); } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 15e22b3dd8a..f83624501a2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -84,6 +84,7 @@ import org.apache.kafka.common.message.DescribeAclsResponseData; import org.apache.kafka.common.message.DescribeConfigsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; +import org.apache.kafka.common.message.DescribeLogDirsResponseData; import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; @@ -123,6 +124,7 @@ import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.DescribeClientQuotasResponse; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.requests.ElectLeadersResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; @@ -3744,6 +3746,34 @@ public class KafkaAdminClientTest { .setErrorCode(error.code())).collect(Collectors.toList()))))); } + @Test + public void testDescribeLogDirsPartialFailure() throws Exception { + try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.RETRIES_CONFIG, "0")) { + // As we won't retry, this calls fails immediately with a DisconnectException + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.NONE, "/data"), + env.cluster().nodeById(0), + true); + + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.NONE, "/data"), + env.cluster().nodeById(1)); + + DescribeLogDirsResult result = env.adminClient().describeLogDirs(Arrays.asList(0, 1)); + + TestUtils.assertFutureThrows(result.values().get(0), ApiException.class); + assertNotNull(result.values().get(1).get()); + } + } + + private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir) { + return new DescribeLogDirsResponse(new DescribeLogDirsResponseData() + .setResults(Collections.singletonList( + new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(error.code()) + .setLogDir(logDir)))); + } + private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member, MemberAssignment assignment) { return new MemberDescription(member.memberId(),