KAFKA-16427 KafkaConsumer#position() does not respect timeout when group protocol is CONSUMER (#15843)

The AsyncKafkaConsumer implementation of position(TopicPartition, Duration) was not updating its internal Timer, causing it to execute the loop forever. Adding a call to update the Timer at the bottom of the loop fixes the issue.

An integration test was added to catch this case; it fails without the newly added call to Timer.update(long).

Reviewers: Lianet Magrans <lianetmr@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kirk True 2024-05-03 19:29:27 -07:00 committed by GitHub
parent 1fd39150aa
commit 9b8aac22ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 15 additions and 1 deletions

View File

@ -906,6 +906,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
return position.offset;
updateFetchPositions(timer);
timer.update();
} while (timer.notExpired());
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +

View File

@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException}
import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException, TimeoutException}
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.serialization._
@ -859,4 +859,17 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(20, timestampTp1.timestamp)
assertEquals(Optional.of(0), timestampTp1.leaderEpoch)
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@Timeout(15)
def testPositionRespectsTimeout(quorum: String, groupProtocol: String): Unit = {
val topicPartition = new TopicPartition(topic, 15)
val consumer = createConsumer()
consumer.assign(List(topicPartition).asJava)
// When position() is called for a topic/partition that doesn't exist, the consumer will repeatedly update the
// local metadata. However, it should give up after the user-supplied timeout has past.
assertThrows(classOf[TimeoutException], () => consumer.position(topicPartition, Duration.ofSeconds(3)))
}
}