MINOR; KafkaAdminClient#describeLogDirs should not fail all the futures when only one call fails (#8998)

Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
David Jacot 2020-07-13 17:00:51 +02:00 committed by GitHub
parent cec5f377b5
commit f40aa33de7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 7 deletions

View File

@ -2272,12 +2272,11 @@ public class KafkaAdminClient extends AdminClient {
public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options) { public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options) {
final Map<Integer, KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>> futures = new HashMap<>(brokers.size()); final Map<Integer, KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>>> futures = new HashMap<>(brokers.size());
for (Integer brokerId: brokers) {
futures.put(brokerId, new KafkaFutureImpl<>());
}
final long now = time.milliseconds(); final long now = time.milliseconds();
for (final Integer brokerId : brokers) { for (final Integer brokerId : brokers) {
KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>> future = new KafkaFutureImpl<>();
futures.put(brokerId, future);
runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.timeoutMs()), runnable.call(new Call("describeLogDirs", calcDeadlineMs(now, options.timeoutMs()),
new ConstantNodeIdProvider(brokerId)) { new ConstantNodeIdProvider(brokerId)) {
@ -2290,7 +2289,6 @@ public class KafkaAdminClient extends AdminClient {
@Override @Override
public void handleResponse(AbstractResponse abstractResponse) { public void handleResponse(AbstractResponse abstractResponse) {
DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse; DescribeLogDirsResponse response = (DescribeLogDirsResponse) abstractResponse;
KafkaFutureImpl<Map<String, DescribeLogDirsResponse.LogDirInfo>> future = futures.get(brokerId);
if (response.logDirInfos().size() > 0) { if (response.logDirInfos().size() > 0) {
future.complete(response.logDirInfos()); future.complete(response.logDirInfos());
} else { } else {
@ -2300,7 +2298,7 @@ public class KafkaAdminClient extends AdminClient {
} }
@Override @Override
void handleFailure(Throwable throwable) { void handleFailure(Throwable throwable) {
completeAllExceptionally(futures.values(), throwable); future.completeExceptionally(throwable);
} }
}, now); }, now);
} }

View File

@ -84,6 +84,7 @@ import org.apache.kafka.common.message.DescribeAclsResponseData;
import org.apache.kafka.common.message.DescribeConfigsResponseData; import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
@ -123,6 +124,7 @@ import org.apache.kafka.common.requests.DescribeAclsResponse;
import org.apache.kafka.common.requests.DescribeClientQuotasResponse; import org.apache.kafka.common.requests.DescribeClientQuotasResponse;
import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeConfigsResponse;
import org.apache.kafka.common.requests.DescribeGroupsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.requests.ElectLeadersResponse; import org.apache.kafka.common.requests.ElectLeadersResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse; import org.apache.kafka.common.requests.IncrementalAlterConfigsResponse;
@ -3744,6 +3746,34 @@ public class KafkaAdminClientTest {
.setErrorCode(error.code())).collect(Collectors.toList()))))); .setErrorCode(error.code())).collect(Collectors.toList())))));
} }
@Test
public void testDescribeLogDirsPartialFailure() throws Exception {
try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.RETRIES_CONFIG, "0")) {
// As we won't retry, this calls fails immediately with a DisconnectException
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(Errors.NONE, "/data"),
env.cluster().nodeById(0),
true);
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(Errors.NONE, "/data"),
env.cluster().nodeById(1));
DescribeLogDirsResult result = env.adminClient().describeLogDirs(Arrays.asList(0, 1));
TestUtils.assertFutureThrows(result.values().get(0), ApiException.class);
assertNotNull(result.values().get(1).get());
}
}
private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir) {
return new DescribeLogDirsResponse(new DescribeLogDirsResponseData()
.setResults(Collections.singletonList(
new DescribeLogDirsResponseData.DescribeLogDirsResult()
.setErrorCode(error.code())
.setLogDir(logDir))));
}
private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member, private static MemberDescription convertToMemberDescriptions(DescribedGroupMember member,
MemberAssignment assignment) { MemberAssignment assignment) {
return new MemberDescription(member.memberId(), return new MemberDescription(member.memberId(),