KAFKA-9558; Fix retry logic in KafkaAdminClient listOffsets (#8119)

This PR is to fix the retry logic for `getListOffsetsCalls`. Previously, if there were partitions with errors, it would only pass in the current call object to retry after a metadata refresh. However, if there's a leader change, the call object never gets updated with the correct leader node to query. This PR fixes this by making another call to `getListOffsetsCalls` with only the error topic partitions as the next calls to be made after the metadata refresh. In addition there is an additional test to test the scenario where a leader change occurs.

Reviewers: Jason Gustafson <jason@confluent.io>
This commit is contained in:
Sanjana Kaundinya 2020-02-19 09:11:45 -08:00 committed by GitHub
parent 913c61934e
commit eb7dfef245
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 120 additions and 8 deletions

View File

@ -3785,7 +3785,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
ListOffsetResponse response = (ListOffsetResponse) abstractResponse;
Set<TopicPartition> partitionsWithErrors = new HashSet<>();
Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new HashMap<>();
for (Entry<TopicPartition, PartitionData> result : response.responseData().entrySet()) {
TopicPartition tp = result.getKey();
@ -3793,8 +3793,11 @@ public class KafkaAdminClient extends AdminClient {
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
Errors error = partitionData.error;
if (MetadataOperationContext.shouldRefreshMetadata(error)) {
partitionsWithErrors.add(tp);
OffsetSpec offsetRequestSpec = topicPartitionOffsets.get(tp);
if (offsetRequestSpec == null) {
future.completeExceptionally(new KafkaException("Unexpected topic partition " + tp + " in broker response!"));
} else if (MetadataOperationContext.shouldRefreshMetadata(error)) {
retryTopicPartitionOffsets.put(tp, offsetRequestSpec);
} else if (error == Errors.NONE) {
future.complete(new ListOffsetsResultInfo(partitionData.offset, partitionData.timestamp, partitionData.leaderEpoch));
} else {
@ -3802,12 +3805,12 @@ public class KafkaAdminClient extends AdminClient {
}
}
if (!partitionsWithErrors.isEmpty()) {
partitionsToQuery.keySet().retainAll(partitionsWithErrors);
Set<String> retryTopics = partitionsWithErrors.stream().map(tp -> tp.topic()).collect(Collectors.toSet());
if (!retryTopicPartitionOffsets.isEmpty()) {
Set<String> retryTopics = retryTopicPartitionOffsets.keySet().stream().map(
TopicPartition::topic).collect(Collectors.toSet());
MetadataOperationContext<ListOffsetsResultInfo, ListOffsetsOptions> retryContext =
new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures);
rescheduleMetadataTask(retryContext, () -> Collections.singletonList(this));
new MetadataOperationContext<>(retryTopics, context.options(), context.deadline(), futures);
rescheduleMetadataTask(retryContext, () -> getListOffsetsCalls(retryContext, retryTopicPartitionOffsets, futures));
}
}

View File

@ -2495,6 +2495,115 @@ public class KafkaAdminClientTest {
}
}
@Test
public void testListOffsetsWithMultiplePartitionsLeaderChange() throws Exception {
Node node0 = new Node(0, "localhost", 8120);
Node node1 = new Node(1, "localhost", 8121);
Node node2 = new Node(2, "localhost", 8122);
List<Node> nodes = Arrays.asList(node0, node1, node2);
final PartitionInfo oldPInfo1 = new PartitionInfo("foo", 0, node0,
new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
final PartitionInfo oldPnfo2 = new PartitionInfo("foo", 1, node0,
new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
List<PartitionInfo> oldPInfos = Arrays.asList(oldPInfo1, oldPnfo2);
final Cluster oldCluster = new Cluster("mockClusterId", nodes, oldPInfos,
Collections.emptySet(), Collections.emptySet(), node0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
final TopicPartition tp1 = new TopicPartition("foo", 1);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(oldCluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE));
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, 345L, Optional.of(543)));
responseData.put(tp1, new PartitionData(Errors.LEADER_NOT_AVAILABLE, -2L, 123L, Optional.of(456)));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
final PartitionInfo newPInfo1 = new PartitionInfo("foo", 0, node1,
new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
final PartitionInfo newPInfo2 = new PartitionInfo("foo", 1, node2,
new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
List<PartitionInfo> newPInfos = Arrays.asList(newPInfo1, newPInfo2);
final Cluster newCluster = new Cluster("mockClusterId", nodes, newPInfos,
Collections.emptySet(), Collections.emptySet(), node0);
env.kafkaClient().prepareResponse(prepareMetadataResponse(newCluster, Errors.NONE));
responseData = new HashMap<>();
responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 345L, Optional.of(543)));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1);
responseData = new HashMap<>();
responseData.put(tp1, new PartitionData(Errors.NONE, -2L, 123L, Optional.of(456)));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node2);
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
partitions.put(tp0, OffsetSpec.latest());
partitions.put(tp1, OffsetSpec.latest());
ListOffsetsResult result = env.adminClient().listOffsets(partitions);
Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
assertFalse(offsets.isEmpty());
assertEquals(345L, offsets.get(tp0).offset());
assertEquals(543, offsets.get(tp0).leaderEpoch().get().intValue());
assertEquals(-1L, offsets.get(tp0).timestamp());
assertEquals(123L, offsets.get(tp1).offset());
assertEquals(456, offsets.get(tp1).leaderEpoch().get().intValue());
assertEquals(-2L, offsets.get(tp1).timestamp());
}
}
@Test
public void testListOffsetsWithLeaderChange() throws Exception {
Node node0 = new Node(0, "localhost", 8120);
Node node1 = new Node(1, "localhost", 8121);
Node node2 = new Node(2, "localhost", 8122);
List<Node> nodes = Arrays.asList(node0, node1, node2);
final PartitionInfo oldPartitionInfo = new PartitionInfo("foo", 0, node0,
new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
final Cluster oldCluster = new Cluster("mockClusterId", nodes, singletonList(oldPartitionInfo),
Collections.emptySet(), Collections.emptySet(), node0);
final TopicPartition tp0 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(oldCluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE));
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, 345L, Optional.of(543)));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
// updating leader from node0 to node1 and metadata refresh because of NOT_LEADER_FOR_PARTITION
final PartitionInfo newPartitionInfo = new PartitionInfo("foo", 0, node1,
new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
final Cluster newCluster = new Cluster("mockClusterId", nodes, singletonList(newPartitionInfo),
Collections.emptySet(), Collections.emptySet(), node0);
env.kafkaClient().prepareResponse(prepareMetadataResponse(newCluster, Errors.NONE));
responseData = new HashMap<>();
responseData.put(tp0, new PartitionData(Errors.NONE, -2L, 123L, Optional.of(456)));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node1);
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
partitions.put(tp0, OffsetSpec.latest());
ListOffsetsResult result = env.adminClient().listOffsets(partitions);
Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
assertFalse(offsets.isEmpty());
assertEquals(123L, offsets.get(tp0).offset());
assertEquals(456, offsets.get(tp0).leaderEpoch().get().intValue());
assertEquals(-2L, offsets.get(tp0).timestamp());
}
}
@Test
public void testListOffsetsMetadataNonRetriableErrors() throws Exception {