From 9b8aac22ec7ce927a2ceb2bfe7afd57419ee946c Mon Sep 17 00:00:00 2001 From: Kirk True Date: Fri, 3 May 2024 19:29:27 -0700 Subject: [PATCH] KAFKA-16427 KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER (#15843) The AsyncKafkaConsumer implementation of position(TopicPartition, Duration) was not updating its internal Timer, causing it to execute the loop forever. Adding a call to update the Timer at the bottom of the loop fixes the issue. An integration test was added to catch this case; it fails without the newly added call to Timer.update(long). Reviewers: Lianet Magrans , Chia-Ping Tsai --- .../consumer/internals/AsyncKafkaConsumer.java | 1 + .../kafka/api/PlaintextConsumerTest.scala | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index e117995e6cb..3395bf62489 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -906,6 +906,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { return position.offset; updateFetchPositions(timer); + timer.update(); } while (timer.notExpired()); throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " + diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 8995ea5657d..8503cad0816 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition} import org.apache.kafka.common.config.TopicConfig -import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException} +import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException, TimeoutException} import org.apache.kafka.common.header.Headers import org.apache.kafka.common.record.{CompressionType, TimestampType} import org.apache.kafka.common.serialization._ @@ -859,4 +859,17 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(20, timestampTp1.timestamp) assertEquals(Optional.of(0), timestampTp1.leaderEpoch) } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @Timeout(15) + def testPositionRespectsTimeout(quorum: String, groupProtocol: String): Unit = { + val topicPartition = new TopicPartition(topic, 15) + val consumer = createConsumer() + consumer.assign(List(topicPartition).asJava) + + // When position() is called for a topic/partition that doesn't exist, the consumer will repeatedly update the + // local metadata. However, it should give up after the user-supplied timeout has past. + assertThrows(classOf[TimeoutException], () => consumer.position(topicPartition, Duration.ofSeconds(3))) + } }