KAFKA-12879: Revert "KAFKA-12339: Add retry to admin client's listOffsets (#10152)"

This reverts commit fe132ee293.
This commit is contained in:
Randall Hauch 2022-03-10 15:16:24 -06:00
parent dc209f102a
commit d5b53ad132
2 changed files with 2 additions and 40 deletions

View File

@ -82,7 +82,6 @@ public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
public static void handleMetadataErrors(MetadataResponse response) { public static void handleMetadataErrors(MetadataResponse response) {
for (TopicMetadata tm : response.topicMetadata()) { for (TopicMetadata tm : response.topicMetadata()) {
if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();
for (PartitionMetadata pm : tm.partitionMetadata()) { for (PartitionMetadata pm : tm.partitionMetadata()) {
if (shouldRefreshMetadata(pm.error)) { if (shouldRefreshMetadata(pm.error)) {
throw pm.error.exception(); throw pm.error.exception();

View File

@ -342,15 +342,11 @@ public class KafkaAdminClientTest {
} }
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) { private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
return prepareMetadataResponse(cluster, error, error);
}
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) {
List<TopicMetadata> metadata = new ArrayList<>(); List<TopicMetadata> metadata = new ArrayList<>();
for (String topic : cluster.topics()) { for (String topic : cluster.topics()) {
List<PartitionMetadata> pms = new ArrayList<>(); List<PartitionMetadata> pms = new ArrayList<>();
for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) { for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
PartitionMetadata pm = new PartitionMetadata(partitionError, PartitionMetadata pm = new PartitionMetadata(error,
new TopicPartition(topic, pInfo.partition()), new TopicPartition(topic, pInfo.partition()),
Optional.of(pInfo.leader().id()), Optional.of(pInfo.leader().id()),
Optional.of(234), Optional.of(234),
@ -359,7 +355,7 @@ public class KafkaAdminClientTest {
Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList())); Arrays.stream(pInfo.offlineReplicas()).map(Node::id).collect(Collectors.toList()));
pms.add(pm); pms.add(pm);
} }
TopicMetadata tm = new TopicMetadata(topicError, topic, false, pms); TopicMetadata tm = new TopicMetadata(error, topic, false, pms);
metadata.add(tm); metadata.add(tm);
} }
return MetadataResponse.prepareResponse(0, return MetadataResponse.prepareResponse(0,
@ -2346,39 +2342,6 @@ public class KafkaAdminClientTest {
} }
} }
@Test
public void testListOffsetsRetriableErrorOnMetadata() throws Exception {
Node node = new Node(0, "localhost", 8120);
List<Node> nodes = Collections.singletonList(node);
final Cluster cluster = new Cluster(
"mockClusterId",
nodes,
Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
Collections.emptySet(),
Collections.emptySet(),
node);
final TopicPartition tp0 = new TopicPartition("foo", 0);
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
// metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
// listoffsets response from broker 0
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
responseData.put(tp0, new PartitionData(Errors.NONE, -1L, 123L, Optional.of(321)));
env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
assertEquals(1, offsets.size());
assertEquals(123L, offsets.get(tp0).offset());
assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue());
assertEquals(-1L, offsets.get(tp0).timestamp());
}
}
@Test @Test
public void testListOffsetsRetriableErrors() throws Exception { public void testListOffsetsRetriableErrors() throws Exception {