KAFKA-6789; Handle retriable group errors in AdminClient API (#5578)

This patch adds support to retry all group operations after COORDINATOR_LOAD_IN_PROGRESS and COORDINATOR_NOT_AVAILABLE in AdminClient group operations. Previously we only had logic to retry after FindCoordinator failures.

Reviewers: Yishun Guan <gyishun@gmail.com>, Viktor Somogyi <viktorsomogyi@gmail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
Manikumar Reddy 2019-05-10 04:01:10 +05:30 committed by Jason Gustafson
parent a97e55b838
commit 5a8d74e151
3 changed files with 116 additions and 68 deletions

View File

@ -2551,7 +2551,7 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) { void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse; final FindCoordinatorResponse fcResponse = (FindCoordinatorResponse) abstractResponse;
if (handleFindCoordinatorError(fcResponse, futures.get(groupId))) if (handleGroupRequestError(fcResponse.error(), futures.get(groupId)))
return; return;
final long nowDescribeConsumerGroups = time.milliseconds(); final long nowDescribeConsumerGroups = time.milliseconds();
@ -2577,10 +2577,10 @@ public class KafkaAdminClient extends AdminClient {
.findFirst().get(); .findFirst().get();
final Errors groupError = Errors.forCode(describedGroup.errorCode()); final Errors groupError = Errors.forCode(describedGroup.errorCode());
if (groupError != Errors.NONE) {
// TODO: KAFKA-6789, we can retry based on the error code if (handleGroupRequestError(groupError, future))
future.completeExceptionally(groupError.exception()); return;
} else {
final String protocolType = describedGroup.protocolType(); final String protocolType = describedGroup.protocolType();
if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) {
final List<DescribedGroupMember> members = describedGroup.members(); final List<DescribedGroupMember> members = describedGroup.members();
@ -2610,7 +2610,6 @@ public class KafkaAdminClient extends AdminClient {
future.complete(consumerGroupDescription); future.complete(consumerGroupDescription);
} }
} }
}
@Override @Override
void handleFailure(Throwable throwable) { void handleFailure(Throwable throwable) {
@ -2641,11 +2640,10 @@ public class KafkaAdminClient extends AdminClient {
.collect(Collectors.toSet()); .collect(Collectors.toSet());
} }
private boolean handleFindCoordinatorError(FindCoordinatorResponse response, KafkaFutureImpl<?> future) { private boolean handleGroupRequestError(Errors error, KafkaFutureImpl<?> future) {
Errors error = response.error(); if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE) {
if (error.exception() instanceof RetriableException) {
throw error.exception(); throw error.exception();
} else if (response.hasError()) { } else if (error != Errors.NONE) {
future.completeExceptionally(error.exception()); future.completeExceptionally(error.exception());
return true; return true;
} }
@ -2797,7 +2795,7 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) { void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
if (handleFindCoordinatorError(response, groupOffsetListingFuture)) if (handleGroupRequestError(response.error(), groupOffsetListingFuture))
return; return;
final long nowListConsumerGroupOffsets = time.milliseconds(); final long nowListConsumerGroupOffsets = time.milliseconds();
@ -2815,9 +2813,9 @@ public class KafkaAdminClient extends AdminClient {
final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse;
final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>(); final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>();
if (response.hasError()) { if (handleGroupRequestError(response.error(), groupOffsetListingFuture))
groupOffsetListingFuture.completeExceptionally(response.error().exception()); return;
} else {
for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry :
response.responseData().entrySet()) { response.responseData().entrySet()) {
final TopicPartition topicPartition = entry.getKey(); final TopicPartition topicPartition = entry.getKey();
@ -2835,7 +2833,6 @@ public class KafkaAdminClient extends AdminClient {
} }
groupOffsetListingFuture.complete(groupOffsetsListing); groupOffsetListingFuture.complete(groupOffsetsListing);
} }
}
@Override @Override
void handleFailure(Throwable throwable) { void handleFailure(Throwable throwable) {
@ -2891,7 +2888,7 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) { void handleResponse(AbstractResponse abstractResponse) {
final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
if (handleFindCoordinatorError(response, futures.get(groupId))) if (handleGroupRequestError(response.error(), futures.get(groupId)))
return; return;
final long nowDeleteConsumerGroups = time.milliseconds(); final long nowDeleteConsumerGroups = time.milliseconds();
@ -2912,12 +2909,11 @@ public class KafkaAdminClient extends AdminClient {
KafkaFutureImpl<Void> future = futures.get(groupId); KafkaFutureImpl<Void> future = futures.get(groupId);
final Errors groupError = response.get(groupId); final Errors groupError = response.get(groupId);
if (groupError != Errors.NONE) { if (handleGroupRequestError(groupError, future))
future.completeExceptionally(groupError.exception()); return;
} else {
future.complete(null); future.complete(null);
} }
}
@Override @Override
void handleFailure(Throwable throwable) { void handleFailure(Throwable throwable) {

View File

@ -62,7 +62,7 @@ public class ListGroupsResponse extends AbstractResponse {
/** /**
* Possible error codes: * Possible error codes:
* *
* COORDINATOR_LOADING_IN_PROGRESS (14) * COORDINATOR_LOAD_IN_PROGRESS (14)
* COORDINATOR_NOT_AVAILABLE (15) * COORDINATOR_NOT_AVAILABLE (15)
* AUTHORIZATION_FAILED (29) * AUTHORIZATION_FAILED (29)
*/ */

View File

@ -942,7 +942,7 @@ public class KafkaAdminClientTest {
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), nodes.get(0)); Collections.emptySet(), nodes.get(0));
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Empty metadata response should be retried // Empty metadata response should be retried
@ -1076,9 +1076,37 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); //Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode()));
env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
DescribeGroupsResponseData data = new DescribeGroupsResponseData(); DescribeGroupsResponseData data = new DescribeGroupsResponseData();
//Retriable errors should be retried
data.groups().add(DescribeGroupsResponse.groupMetadata(
"group-0",
Errors.COORDINATOR_LOAD_IN_PROGRESS,
"",
"",
"",
Collections.emptyList(),
Collections.emptySet()));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
data = new DescribeGroupsResponseData();
data.groups().add(DescribeGroupsResponse.groupMetadata(
"group-0",
Errors.COORDINATOR_NOT_AVAILABLE,
"",
"",
"",
Collections.emptyList(),
Collections.emptySet()));
env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data));
data = new DescribeGroupsResponseData();
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2); TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
@ -1143,7 +1171,14 @@ public class KafkaAdminClientTest {
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); //Retriable FindCoordinatorResponse errors should be retried
env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode()));
env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
//Retriable errors should be retried
env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_NOT_AVAILABLE, Collections.emptyMap()));
env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0); TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1); TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
@ -1192,9 +1227,9 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
final Map<String, Errors> response = new HashMap<>(); final Map<String, Errors> validResponse = new HashMap<>();
response.put("group-0", Errors.NONE); validResponse.put("group-0", Errors.NONE);
env.kafkaClient().prepareResponse(new DeleteGroupsResponse(response)); env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse));
final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds); final DeleteConsumerGroupsResult result = env.adminClient().deleteConsumerGroups(groupIds);
@ -1207,6 +1242,23 @@ public class KafkaAdminClientTest {
final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds); final DeleteConsumerGroupsResult errorResult = env.adminClient().deleteConsumerGroups(groupIds);
TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class); TestUtils.assertFutureError(errorResult.deletedGroups().get("group-0"), GroupAuthorizationException.class);
//Retriable errors should be retried
env.kafkaClient().prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, env.cluster().controller()));
final Map<String, Errors> errorResponse1 = new HashMap<>();
errorResponse1.put("group-0", Errors.COORDINATOR_NOT_AVAILABLE);
env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse1));
final Map<String, Errors> errorResponse2 = new HashMap<>();
errorResponse2.put("group-0", Errors.COORDINATOR_LOAD_IN_PROGRESS);
env.kafkaClient().prepareResponse(new DeleteGroupsResponse(errorResponse2));
env.kafkaClient().prepareResponse(new DeleteGroupsResponse(validResponse));
final DeleteConsumerGroupsResult errorResult1 = env.adminClient().deleteConsumerGroups(groupIds);
final KafkaFuture<Void> errorResults = errorResult1.deletedGroups().get("group-0");
assertNull(errorResults.get());
} }
} }