KAFKA-16659 KafkaConsumer#position() does not respect wakup when group protocol is CONSUMER (#15853)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-05-06 08:45:11 +08:00 committed by GitHub
parent 25118cec14
commit 970ac07881
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 40 additions and 1 deletions

View File

@ -907,6 +907,7 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
updateFetchPositions(timer);
timer.update();
wakeupTrigger.maybeTriggerWakeup();
} while (timer.notExpired());
throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
@ -1703,12 +1704,15 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
new FetchCommittedOffsetsEvent(
initializingPartitions,
timer);
wakeupTrigger.setActiveTask(event.future());
final Map<TopicPartition, OffsetAndMetadata> offsets = applicationEventHandler.addAndGet(event, timer);
refreshCommittedOffsets(offsets, metadata, subscriptions);
return true;
} catch (TimeoutException e) {
log.error("Couldn't refresh committed offsets before timeout expired");
return false;
} finally {
wakeupTrigger.clearTask();
}
}

View File

@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.{KafkaException, MetricName, TopicPartition}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException, TimeoutException}
import org.apache.kafka.common.errors.{InvalidGroupIdException, InvalidTopicException, TimeoutException, WakeupException}
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.serialization._
@ -33,6 +33,7 @@ import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
import java.util.concurrent.{CompletableFuture, TimeUnit}
import scala.jdk.CollectionConverters._
@Timeout(600)
@ -872,4 +873,38 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// local metadata. However, it should give up after the user-supplied timeout has past.
assertThrows(classOf[TimeoutException], () => consumer.position(topicPartition, Duration.ofSeconds(3)))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@Timeout(15)
def testPositionRespectsWakeup(quorum: String, groupProtocol: String): Unit = {
val topicPartition = new TopicPartition(topic, 15)
val consumer = createConsumer()
consumer.assign(List(topicPartition).asJava)
CompletableFuture.runAsync { () =>
TimeUnit.SECONDS.sleep(1)
consumer.wakeup()
}
assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(3)))
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@Timeout(15)
def testPositionWithErrorConnectionRespectsWakeup(quorum: String, groupProtocol: String): Unit = {
val topicPartition = new TopicPartition(topic, 15)
val properties = new Properties()
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345") // make sure the connection fails
val consumer = createConsumer(configOverrides = properties)
consumer.assign(List(topicPartition).asJava)
CompletableFuture.runAsync { () =>
TimeUnit.SECONDS.sleep(1)
consumer.wakeup()
}
assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(100)))
}
}