KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets (#11726)

We introduced `default.api.timeout.ms` in 53ca52f855 but we missed updating `KafkaConsumer.endOffsets` which still use `request.timeout.ms`. This patch fixes this.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
dengziming 2022-02-03 17:32:25 +08:00 committed by GitHub
parent 319732dbeb
commit af6a9a17bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 58 additions and 4 deletions

View File

@ -2197,11 +2197,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* the amount of time allocated by {@code request.timeout.ms} expires
* the amount of time allocated by {@code default.api.timeout.ms} expires
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
}
/**

View File

@ -164,6 +164,8 @@ public class KafkaConsumerTest {
private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
private final int sessionTimeoutMs = 10000;
private final int defaultApiTimeoutMs = 60000;
private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
private final int heartbeatIntervalMs = 1000;
// Set auto commit interval lower than heartbeat so we don't need to deal with
@ -2618,8 +2620,6 @@ public class KafkaConsumerTest {
String clientId = "mock-consumer";
String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
int requestTimeoutMs = 30000;
int defaultApiTimeoutMs = 30000;
int minBytes = 1;
int maxBytes = Integer.MAX_VALUE;
int maxWaitMs = 500;
@ -2948,6 +2948,60 @@ public class KafkaConsumerTest {
() -> new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer()));
}
@Test
public void testOffsetsForTimesTimeout() {
final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
assertEquals(
"Failed to get offsets by times in 60000ms",
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()
);
}
@Test
public void testBeginningOffsetsTimeout() {
final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
assertEquals(
"Failed to get offsets by times in 60000ms",
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.beginningOffsets(singletonList(tp0))).getMessage()
);
}
@Test
public void testEndOffsetsTimeout() {
final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
assertEquals(
"Failed to get offsets by times in 60000ms",
assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.endOffsets(singletonList(tp0))).getMessage()
);
}
private KafkaConsumer<String, String> consumerForCheckingTimeoutException() {
final Time time = new MockTime();
SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, singletonMap(topic, 1));
ConsumerPartitionAssignor assignor = new RangeAssignor();
final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
for (int i = 0; i < 10; i++) {
client.prepareResponse(
request -> {
time.sleep(defaultApiTimeoutMs / 10);
return request instanceof ListOffsetsRequest;
},
listOffsetsResponse(
Collections.emptyMap(),
Collections.singletonMap(tp0, Errors.UNKNOWN_TOPIC_OR_PARTITION)
));
}
return consumer;
}
private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements Deserializer<byte[]> {
@Override