From 970ac0788122e3163c34e54a8e0510ab66329cc4 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Mon, 6 May 2024 08:45:11 +0800 Subject: [PATCH] KAFKA-16659 KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER (#15853) Reviewers: Chia-Ping Tsai --- .../internals/AsyncKafkaConsumer.java | 4 ++ .../kafka/api/PlaintextConsumerTest.scala | 37 ++++++++++++++++++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 3395bf62489..02d97a9312f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -907,6 +907,7 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { updateFetchPositions(timer); timer.update(); + wakeupTrigger.maybeTriggerWakeup(); } while (timer.notExpired()); throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " + @@ -1703,12 +1704,15 @@ public class AsyncKafkaConsumer implements ConsumerDelegate { new FetchCommittedOffsetsEvent( initializingPartitions, timer); + wakeupTrigger.setActiveTask(event.future()); final Map offsets = applicationEventHandler.addAndGet(event, timer); refreshCommittedOffsets(offsets, metadata, subscriptions); return true; } catch (TimeoutException e) { log.error("Couldn't refresh committed offsets before timeout expired"); return false; + } finally { + wakeupTrigger.clearTask(); } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 8503cad0816..7d32d1bd3e0 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition} import org.apache.kafka.common.config.TopicConfig -import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException, TimeoutException} +import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException, TimeoutException, WakeupException} import org.apache.kafka.common.header.Headers import org.apache.kafka.common.record.{CompressionType, TimestampType} import org.apache.kafka.common.serialization._ @@ -33,6 +33,7 @@ import org.junit.jupiter.api.Timeout import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{CsvSource, MethodSource} +import java.util.concurrent.{CompletableFuture, TimeUnit} import scala.jdk.CollectionConverters._ @Timeout(600) @@ -872,4 +873,38 @@ class PlaintextConsumerTest extends BaseConsumerTest { // local metadata. However, it should give up after the user-supplied timeout has past. assertThrows(classOf[TimeoutException], () => consumer.position(topicPartition, Duration.ofSeconds(3))) } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @Timeout(15) + def testPositionRespectsWakeup(quorum: String, groupProtocol: String): Unit = { + val topicPartition = new TopicPartition(topic, 15) + val consumer = createConsumer() + consumer.assign(List(topicPartition).asJava) + + CompletableFuture.runAsync { () => + TimeUnit.SECONDS.sleep(1) + consumer.wakeup() + } + + assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(3))) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @Timeout(15) + def testPositionWithErrorConnectionRespectsWakeup(quorum: String, groupProtocol: String): Unit = { + val topicPartition = new TopicPartition(topic, 15) + val properties = new Properties() + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345") // make sure the connection fails + val consumer = createConsumer(configOverrides = properties) + consumer.assign(List(topicPartition).asJava) + + CompletableFuture.runAsync { () => + TimeUnit.SECONDS.sleep(1) + consumer.wakeup() + } + + assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(100))) + } }