mirror of https://github.com/apache/kafka.git
KAFKA-12339: Add retry to admin client's listOffsets (#10152)
`KafkaAdmin.listOffsets` did not handle topic-level errors, hence the UnknownTopicOrPartitionException on topic-level can obstruct a Connect worker from running when the new internal topic is NOT synced to all brokers. The method did handle partition-level retriable errors by retrying, so this changes to handle topic-level retriable errors in the same way. This allows a Connect worker to start up and have the admin client retry when the worker is trying to read to the end of the newly-created internal topics until the internal topic metadata is synced to all brokers. Author: Chia-Ping Tsai <chia7712@gmail.com> Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
This commit is contained in:
parent
f4bef70dc6
commit
fe132ee293
|
@ -82,6 +82,7 @@ public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
|
||||||
|
|
||||||
public static void handleMetadataErrors(MetadataResponse response) {
|
public static void handleMetadataErrors(MetadataResponse response) {
|
||||||
for (TopicMetadata tm : response.topicMetadata()) {
|
for (TopicMetadata tm : response.topicMetadata()) {
|
||||||
|
if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();
|
||||||
for (PartitionMetadata pm : tm.partitionMetadata()) {
|
for (PartitionMetadata pm : tm.partitionMetadata()) {
|
||||||
if (shouldRefreshMetadata(pm.error)) {
|
if (shouldRefreshMetadata(pm.error)) {
|
||||||
throw pm.error.exception();
|
throw pm.error.exception();
|
||||||
|
|
|
@ -342,11 +342,15 @@ public class KafkaAdminClientTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
|
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
|
||||||
|
return prepareMetadataResponse(cluster, error, error);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) {
|
||||||
List<TopicMetadata> metadata = new ArrayList<>();
|
List<TopicMetadata> metadata = new ArrayList<>();
|
||||||
for (String topic : cluster.topics()) {
|
for (String topic : cluster.topics()) {
|
||||||
List<PartitionMetadata> pms = new ArrayList<>();
|
List<PartitionMetadata> pms = new ArrayList<>();
|
||||||
for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
|
for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
|
||||||
PartitionMetadata pm = new PartitionMetadata(error,
|
PartitionMetadata pm = new PartitionMetadata(partitionError,
|
||||||
new TopicPartition(topic, pInfo.partition()),
|
new TopicPartition(topic, pInfo.partition()),
|
||||||
Optional.of(pInfo.leader().id()),
|
Optional.of(pInfo.leader().id()),
|
||||||
Optional.of(234),
|
Optional.of(234),
|
||||||
|
@ -355,7 +359,7 @@ public class KafkaAdminClientTest {
|
||||||
Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
|
Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
|
||||||
pms.add(pm);
|
pms.add(pm);
|
||||||
}
|
}
|
||||||
TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
|
TopicMetadata tm = new TopicMetadata(topicError, topic, false, pms);
|
||||||
metadata.add(tm);
|
metadata.add(tm);
|
||||||
}
|
}
|
||||||
return MetadataResponse.prepareResponse(0,
|
return MetadataResponse.prepareResponse(0,
|
||||||
|
@ -2342,6 +2346,39 @@ public class KafkaAdminClientTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListOffsetsRetriableErrorOnMetadata() throws Exception {
|
||||||
|
Node node = new Node(0, "localhost", 8120);
|
||||||
|
List<Node> nodes = Collections.singletonList(node);
|
||||||
|
final Cluster cluster = new Cluster(
|
||||||
|
"mockClusterId",
|
||||||
|
nodes,
|
||||||
|
Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
|
||||||
|
Collections.emptySet(),
|
||||||
|
Collections.emptySet(),
|
||||||
|
node);
|
||||||
|
final TopicPartition tp0 = new TopicPartition("foo", 0);
|
||||||
|
|
||||||
|
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
|
||||||
|
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
||||||
|
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
|
||||||
|
// metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION
|
||||||
|
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
|
||||||
|
// listoffsets response from broker 0
|
||||||
|
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
|
||||||
|
responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 123L, Optional.of(321)));
|
||||||
|
env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
|
||||||
|
|
||||||
|
ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latest()));
|
||||||
|
|
||||||
|
Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
|
||||||
|
assertEquals(1, offsets.size());
|
||||||
|
assertEquals(123L, offsets.get(tp0).offset());
|
||||||
|
assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue());
|
||||||
|
assertEquals(-1L, offsets.get(tp0).timestamp());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListOffsetsRetriableErrors() throws Exception {
|
public void testListOffsetsRetriableErrors() throws Exception {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue