mirror of https://github.com/apache/kafka.git
KAFKA-15425: Fail fast in Admin::listOffsets when topic (but not partition) metadata is not found (#14314)
This restores previous behavior for Admin::listOffsets, which was to fail immediately if topic metadata could not be found, and only retry if metadata for one or more specific partitions could not be found. There is a subtle difference here: prior to https://github.com/apache/kafka/pull/13432, the operation would be retried if any metadata error was reported for any individual topic partition, even if an error was also reported for the entire topic. With this change, the operation always fails if an error is reported for the entire topic, even if an error is also reported for one or more individual topic partitions. I am not aware of any cases where brokers might return both topic- and topic partition-level errors for a metadata request, and if there are none, then this change should be safe. However, if there are such cases, we may need to refine this PR to remove the discrepancy in behavior. Reviewers: Justine Olshan <jolshan@confluent.io>
This commit is contained in:
parent
01c7c7a399
commit
77a91be22e
|
|
@ -61,7 +61,7 @@ public final class ListOffsetsHandler extends Batched<TopicPartition, ListOffset
|
|||
this.offsetTimestampsByPartition = offsetTimestampsByPartition;
|
||||
this.options = options;
|
||||
this.log = logContext.logger(ListOffsetsHandler.class);
|
||||
this.lookupStrategy = new PartitionLeaderStrategy(logContext);
|
||||
this.lookupStrategy = new PartitionLeaderStrategy(logContext, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -42,9 +42,15 @@ public class PartitionLeaderStrategy implements AdminApiLookupStrategy<TopicPart
|
|||
};
|
||||
|
||||
private final Logger log;
|
||||
private final boolean tolerateUnknownTopics;
|
||||
|
||||
public PartitionLeaderStrategy(LogContext logContext) {
|
||||
this(logContext, true);
|
||||
}
|
||||
|
||||
public PartitionLeaderStrategy(LogContext logContext, boolean tolerateUnknownTopics) {
|
||||
this.log = logContext.logger(PartitionLeaderStrategy.class);
|
||||
this.tolerateUnknownTopics = tolerateUnknownTopics;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -64,6 +70,7 @@ public class PartitionLeaderStrategy implements AdminApiLookupStrategy<TopicPart
|
|||
return new MetadataRequest.Builder(request);
|
||||
}
|
||||
|
||||
@SuppressWarnings("fallthrough")
|
||||
private void handleTopicError(
|
||||
String topic,
|
||||
Errors topicError,
|
||||
|
|
@ -72,6 +79,12 @@ public class PartitionLeaderStrategy implements AdminApiLookupStrategy<TopicPart
|
|||
) {
|
||||
switch (topicError) {
|
||||
case UNKNOWN_TOPIC_OR_PARTITION:
|
||||
if (!tolerateUnknownTopics) {
|
||||
log.error("Received unknown topic error for topic {}", topic, topicError.exception());
|
||||
failAllPartitionsForTopic(topic, requestPartitions, failed, tp -> topicError.exception(
|
||||
"Failed to fetch metadata for partition " + tp + " because metadata for topic `" + topic + "` could not be found"));
|
||||
break;
|
||||
}
|
||||
case LEADER_NOT_AVAILABLE:
|
||||
case BROKER_NOT_AVAILABLE:
|
||||
log.debug("Metadata request for topic {} returned topic-level error {}. Will retry",
|
||||
|
|
@ -124,6 +137,7 @@ public class PartitionLeaderStrategy implements AdminApiLookupStrategy<TopicPart
|
|||
case LEADER_NOT_AVAILABLE:
|
||||
case BROKER_NOT_AVAILABLE:
|
||||
case KAFKA_STORAGE_ERROR:
|
||||
case UNKNOWN_TOPIC_OR_PARTITION:
|
||||
log.debug("Metadata request for partition {} returned partition-level error {}. Will retry",
|
||||
topicPartition, partitionError);
|
||||
break;
|
||||
|
|
|
|||
|
|
@ -213,6 +213,8 @@ import org.apache.kafka.test.TestUtils;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -558,12 +560,16 @@ public class KafkaAdminClientTest {
|
|||
}
|
||||
|
||||
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
|
||||
return prepareMetadataResponse(cluster, error, error);
|
||||
}
|
||||
|
||||
private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) {
|
||||
List<MetadataResponseTopic> metadata = new ArrayList<>();
|
||||
for (String topic : cluster.topics()) {
|
||||
List<MetadataResponsePartition> pms = new ArrayList<>();
|
||||
for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
|
||||
MetadataResponsePartition pm = new MetadataResponsePartition()
|
||||
.setErrorCode(error.code())
|
||||
.setErrorCode(partitionError.code())
|
||||
.setPartitionIndex(pInfo.partition())
|
||||
.setLeaderId(pInfo.leader().id())
|
||||
.setLeaderEpoch(234)
|
||||
|
|
@ -573,7 +579,7 @@ public class KafkaAdminClientTest {
|
|||
pms.add(pm);
|
||||
}
|
||||
MetadataResponseTopic tm = new MetadataResponseTopic()
|
||||
.setErrorCode(error.code())
|
||||
.setErrorCode(topicError.code())
|
||||
.setName(topic)
|
||||
.setIsInternal(false)
|
||||
.setPartitions(pms);
|
||||
|
|
@ -5462,7 +5468,6 @@ public class KafkaAdminClientTest {
|
|||
|
||||
@Test
|
||||
public void testListOffsetsMetadataRetriableErrors() throws Exception {
|
||||
|
||||
Node node0 = new Node(0, "localhost", 8120);
|
||||
Node node1 = new Node(1, "localhost", 8121);
|
||||
List<Node> nodes = Arrays.asList(node0, node1);
|
||||
|
|
@ -5485,7 +5490,8 @@ public class KafkaAdminClientTest {
|
|||
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
||||
|
||||
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.LEADER_NOT_AVAILABLE));
|
||||
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION));
|
||||
// We retry when a partition of a topic (but not the topic itself) is unknown
|
||||
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE, Errors.UNKNOWN_TOPIC_OR_PARTITION));
|
||||
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
|
||||
|
||||
// listoffsets response from broker 0
|
||||
|
|
@ -5636,9 +5642,13 @@ public class KafkaAdminClientTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListOffsetsMetadataNonRetriableErrors() throws Exception {
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("listOffsetsMetadataNonRetriableErrors")
|
||||
public void testListOffsetsMetadataNonRetriableErrors(
|
||||
Errors topicMetadataError,
|
||||
Errors partitionMetadataError,
|
||||
Class<? extends Throwable> expectedFailure
|
||||
) throws Exception {
|
||||
Node node0 = new Node(0, "localhost", 8120);
|
||||
Node node1 = new Node(1, "localhost", 8121);
|
||||
List<Node> nodes = Arrays.asList(node0, node1);
|
||||
|
|
@ -5654,20 +5664,51 @@ public class KafkaAdminClientTest {
|
|||
node0);
|
||||
|
||||
final TopicPartition tp1 = new TopicPartition("foo", 0);
|
||||
final MetadataResponse preparedResponse = prepareMetadataResponse(
|
||||
cluster, topicMetadataError, partitionMetadataError
|
||||
);
|
||||
|
||||
try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
|
||||
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
|
||||
|
||||
env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.TOPIC_AUTHORIZATION_FAILED));
|
||||
env.kafkaClient().prepareResponse(preparedResponse);
|
||||
|
||||
Map<TopicPartition, OffsetSpec> partitions = new HashMap<>();
|
||||
partitions.put(tp1, OffsetSpec.latest());
|
||||
ListOffsetsResult result = env.adminClient().listOffsets(partitions);
|
||||
|
||||
TestUtils.assertFutureError(result.all(), TopicAuthorizationException.class);
|
||||
TestUtils.assertFutureError(result.all(), expectedFailure);
|
||||
}
|
||||
}
|
||||
|
||||
private static Stream<Arguments> listOffsetsMetadataNonRetriableErrors() {
|
||||
return Stream.of(
|
||||
Arguments.of(
|
||||
Errors.TOPIC_AUTHORIZATION_FAILED,
|
||||
Errors.TOPIC_AUTHORIZATION_FAILED,
|
||||
TopicAuthorizationException.class
|
||||
),
|
||||
Arguments.of(
|
||||
// We fail fast when the entire topic is unknown...
|
||||
Errors.UNKNOWN_TOPIC_OR_PARTITION,
|
||||
Errors.NONE,
|
||||
UnknownTopicOrPartitionException.class
|
||||
),
|
||||
Arguments.of(
|
||||
// ... even if a partition in the topic is also somehow reported as unknown...
|
||||
Errors.UNKNOWN_TOPIC_OR_PARTITION,
|
||||
Errors.UNKNOWN_TOPIC_OR_PARTITION,
|
||||
UnknownTopicOrPartitionException.class
|
||||
),
|
||||
Arguments.of(
|
||||
// ... or a partition in the topic has a different, otherwise-retriable error
|
||||
Errors.UNKNOWN_TOPIC_OR_PARTITION,
|
||||
Errors.LEADER_NOT_AVAILABLE,
|
||||
UnknownTopicOrPartitionException.class
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListOffsetsPartialResponse() throws Exception {
|
||||
Node node0 = new Node(0, "localhost", 8120);
|
||||
|
|
|
|||
Loading…
Reference in New Issue