MINOR: simplify implementation of ConsumerGroupOperationContext.hasCo… (#9449)

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Chia-Ping Tsai 2020-10-22 17:02:13 +08:00 committed by GitHub
parent 94ccd4d25e
commit c283886b8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 19 deletions

View File

@ -3941,25 +3941,16 @@ public class KafkaAdminClient extends AdminClient {
void handleResponse(AbstractResponse abstractResponse) { void handleResponse(AbstractResponse abstractResponse) {
final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse; final OffsetCommitResponse response = (OffsetCommitResponse) abstractResponse;
// If coordinator changed since we fetched it, retry Map<Errors, Integer> errorCounts = response.errorCounts();
if (ConsumerGroupOperationContext.hasCoordinatorMoved(response)) { // 1) If coordinator changed since we fetched it, retry
// 2) If there is a coordinator error, retry
if (ConsumerGroupOperationContext.hasCoordinatorMoved(errorCounts) ||
ConsumerGroupOperationContext.shouldRefreshCoordinator(errorCounts)) {
Call call = getAlterConsumerGroupOffsetsCall(context, offsets); Call call = getAlterConsumerGroupOffsetsCall(context, offsets);
rescheduleFindCoordinatorTask(context, () -> call, this); rescheduleFindCoordinatorTask(context, () -> call, this);
return; return;
} }
// If there is a coordinator error, retry
for (OffsetCommitResponseTopic topic : response.data().topics()) {
for (OffsetCommitResponsePartition partition : topic.partitions()) {
Errors error = Errors.forCode(partition.errorCode());
if (ConsumerGroupOperationContext.shouldRefreshCoordinator(error)) {
Call call = getAlterConsumerGroupOffsetsCall(context, offsets);
rescheduleFindCoordinatorTask(context, () -> call, this);
return;
}
}
}
final Map<TopicPartition, Errors> partitions = new HashMap<>(); final Map<TopicPartition, Errors> partitions = new HashMap<>();
for (OffsetCommitResponseTopic topic : response.data().topics()) { for (OffsetCommitResponseTopic topic : response.data().topics()) {
for (OffsetCommitResponsePartition partition : topic.partitions()) { for (OffsetCommitResponsePartition partition : topic.partitions()) {

View File

@ -17,6 +17,7 @@
package org.apache.kafka.clients.admin.internals; package org.apache.kafka.clients.admin.internals;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import org.apache.kafka.clients.admin.AbstractOptions; import org.apache.kafka.clients.admin.AbstractOptions;
@ -76,12 +77,15 @@ public final class ConsumerGroupOperationContext<T, O extends AbstractOptions<O>
} }
public static boolean hasCoordinatorMoved(AbstractResponse response) { public static boolean hasCoordinatorMoved(AbstractResponse response) {
return response.errorCounts().keySet() return hasCoordinatorMoved(response.errorCounts());
.stream()
.anyMatch(error -> error == Errors.NOT_COORDINATOR);
} }
public static boolean shouldRefreshCoordinator(Errors error) { public static boolean hasCoordinatorMoved(Map<Errors, Integer> errorCounts) {
return error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE; return errorCounts.containsKey(Errors.NOT_COORDINATOR);
}
public static boolean shouldRefreshCoordinator(Map<Errors, Integer> errorCounts) {
return errorCounts.containsKey(Errors.COORDINATOR_LOAD_IN_PROGRESS) ||
errorCounts.containsKey(Errors.COORDINATOR_NOT_AVAILABLE);
} }
} }

View File

@ -23,6 +23,7 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.internals.ConsumerGroupOperationContext;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
@ -115,10 +116,12 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity; import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.quota.ClientQuotaFilter;
import org.apache.kafka.common.quota.ClientQuotaFilterComponent; import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AlterClientQuotasResponse; import org.apache.kafka.common.requests.AlterClientQuotasResponse;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse; import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse; import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
@ -4910,6 +4913,40 @@ public class KafkaAdminClientTest {
} }
} }
@Test
public void testHasCoordinatorMoved() {
Map<Errors, Integer> errors = new HashMap<>();
AbstractResponse response = new AbstractResponse() {
@Override
public Map<Errors, Integer> errorCounts() {
return errors;
}
@Override
protected Struct toStruct(short version) {
return null;
}
};
assertFalse(ConsumerGroupOperationContext.hasCoordinatorMoved(response));
errors.put(Errors.NOT_COORDINATOR, 1);
assertTrue(ConsumerGroupOperationContext.hasCoordinatorMoved(response));
}
@Test
public void testShouldRefreshCoordinator() {
Map<Errors, Integer> errorCounts = new HashMap<>();
assertFalse(ConsumerGroupOperationContext.shouldRefreshCoordinator(errorCounts));
errorCounts.put(Errors.COORDINATOR_LOAD_IN_PROGRESS, 1);
assertTrue(ConsumerGroupOperationContext.shouldRefreshCoordinator(errorCounts));
errorCounts.clear();
errorCounts.put(Errors.COORDINATOR_NOT_AVAILABLE, 1);
assertTrue(ConsumerGroupOperationContext.shouldRefreshCoordinator(errorCounts));
}
private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir) { private DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir) {
return new DescribeLogDirsResponse(new DescribeLogDirsResponseData() return new DescribeLogDirsResponse(new DescribeLogDirsResponseData()
.setResults(Collections.singletonList( .setResults(Collections.singletonList(