From a399852cedf60382ef1f311c37abaecc2e5e59cd Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Tue, 8 Jul 2025 01:41:59 +0800 Subject: [PATCH] KAFKA-19042 Move PlaintextConsumerTest to client-integration-tests module (#20081) Use Java to rewrite PlaintextConsumerTest by new test infra and move it to client-integration-tests module. Reviewers: Jhen-Yung Hsu , TengYao Chi , Chia-Ping Tsai --- build.gradle | 1 + .../kafka/clients/ClientsTestUtils.java | 49 +- .../consumer/PlaintextConsumerTest.java | 1640 +++++++++++++++++ .../kafka/api/BaseConsumerTest.scala | 40 +- .../kafka/api/PlaintextConsumerTest.scala | 821 +-------- 5 files changed, 1681 insertions(+), 870 deletions(-) create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java diff --git a/build.gradle b/build.gradle index 1ba506e6314..6f0288037a7 100644 --- a/build.gradle +++ b/build.gradle @@ -2001,6 +2001,7 @@ project(':clients:clients-integration-tests') { implementation project(':group-coordinator') implementation project(':group-coordinator:group-coordinator-api') implementation project(':transaction-coordinator') + testImplementation project(':test-common:test-common-util') testImplementation libs.junitJupiter testImplementation libs.junitPlatformSuiteEngine diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java index 0c99d51da88..dfdadeb5090 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java @@ -64,19 +64,19 @@ public class ClientsTestUtils { private ClientsTestUtils() {} - public static List> consumeRecords( - Consumer consumer, + public static List> consumeRecords( + Consumer consumer, int numRecords ) throws InterruptedException { return consumeRecords(consumer, numRecords, Integer.MAX_VALUE); } - public static List> consumeRecords( - Consumer consumer, + public static List> consumeRecords( + Consumer consumer, int numRecords, int maxPollRecords ) throws InterruptedException { - List> consumedRecords = new ArrayList<>(); + List> consumedRecords = new ArrayList<>(); TestUtils.waitForCondition(() -> { var records = consumer.poll(Duration.ofMillis(100)); records.forEach(consumedRecords::add); @@ -138,6 +138,31 @@ public class ClientsTestUtils { }, waitTimeMs, msg); } + public static void consumeAndVerifyRecordsWithTimeTypeLogAppend( + Consumer consumer, + TopicPartition tp, + int numRecords, + long startingTimestamp + ) throws InterruptedException { + var records = consumeRecords(consumer, numRecords, Integer.MAX_VALUE); + var now = System.currentTimeMillis(); + for (var i = 0; i < numRecords; i++) { + var record = records.get(i); + assertEquals(tp.topic(), record.topic()); + assertEquals(tp.partition(), record.partition()); + + assertTrue(record.timestamp() >= startingTimestamp && record.timestamp() <= now, + "Got unexpected timestamp " + record.timestamp() + ". Timestamp should be between [" + startingTimestamp + ", " + now + "]"); + + assertEquals(i, record.offset()); + assertEquals(KEY_PREFIX + i, new String(record.key())); + assertEquals(VALUE_PREFIX + i, new String(record.value())); + // this is true only because K and V are byte arrays + assertEquals((KEY_PREFIX + i).length(), record.serializedKeySize()); + assertEquals((VALUE_PREFIX + i).length(), record.serializedValueSize()); + } + } + public static void consumeAndVerifyRecords( Consumer consumer, TopicPartition tp, @@ -281,8 +306,8 @@ public class ClientsTestUtils { return record; } - public static void sendAndAwaitAsyncCommit( - Consumer consumer, + public static void sendAndAwaitAsyncCommit( + Consumer consumer, Optional> offsetsOpt ) throws InterruptedException { @@ -423,8 +448,8 @@ public class ClientsTestUtils { } } - public static void sendAsyncCommit( - Consumer consumer, + public static void sendAsyncCommit( + Consumer consumer, OffsetCommitCallback callback, Optional> offsetsOpt ) { @@ -472,14 +497,14 @@ public class ClientsTestUtils { } } - private static class RetryCommitCallback implements OffsetCommitCallback { + private static class RetryCommitCallback implements OffsetCommitCallback { boolean isComplete = false; Optional error = Optional.empty(); - Consumer consumer; + Consumer consumer; Optional> offsetsOpt; public RetryCommitCallback( - Consumer consumer, + Consumer consumer, Optional> offsetsOpt ) { this.consumer = consumer; diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java new file mode 100644 index 00000000000..5fd2ad20089 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java @@ -0,0 +1,1640 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import kafka.server.KafkaBroker; + +import org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.errors.InvalidGroupIdException; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Flaky; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.test.MockConsumerInterceptor; +import org.apache.kafka.test.MockProducerInterceptor; +import org.apache.kafka.test.TestUtils; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; + +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.BROKER_COUNT; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TOPIC; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.TP; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testClusterResourceListener; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testCoordinatorFailover; +import static org.apache.kafka.clients.ClientsTestUtils.BaseConsumerTestcase.testSimpleConsumption; +import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment; +import static org.apache.kafka.clients.ClientsTestUtils.awaitRebalance; +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecordsWithTimeTypeLogAppend; +import static org.apache.kafka.clients.ClientsTestUtils.consumeRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendAndAwaitAsyncCommit; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = BROKER_COUNT, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, value = "60000"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), + } +) +public class PlaintextConsumerTest { + + private final ClusterInstance cluster; + public static final double EPSILON = 0.1; + + public PlaintextConsumerTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @ClusterTest + public void testClassicConsumerSimpleConsumption() throws InterruptedException { + testSimpleConsumption(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerSimpleConsumption() throws InterruptedException { + testSimpleConsumption(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testClassicConsumerClusterResourceListener() throws InterruptedException { + testClusterResourceListener(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerClusterResourceListener() throws InterruptedException { + testClusterResourceListener(cluster, Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testClassicConsumerCoordinatorFailover() throws InterruptedException { + Map config = Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + SESSION_TIMEOUT_MS_CONFIG, 5001, + HEARTBEAT_INTERVAL_MS_CONFIG, 1000, + // Use higher poll timeout to avoid consumer leaving the group due to timeout + MAX_POLL_INTERVAL_MS_CONFIG, 15000 + ); + testCoordinatorFailover(cluster, config); + } + + @ClusterTest + public void testAsyncConsumeCoordinatorFailover() throws InterruptedException { + Map config = Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + // Use higher poll timeout to avoid consumer leaving the group due to timeout + MAX_POLL_INTERVAL_MS_CONFIG, 15000 + ); + testCoordinatorFailover(cluster, config); + } + + @ClusterTest + public void testClassicConsumerHeaders() throws Exception { + testHeaders(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerHeaders() throws Exception { + testHeaders(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testHeaders(Map consumerConfig) throws Exception { + var numRecords = 1; + + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getBytes(), "value".getBytes()); + record.headers().add("headerKey", "headerValue".getBytes()); + producer.send(record); + + assertEquals(0, consumer.assignment().size()); + consumer.assign(List.of(TP)); + assertEquals(1, consumer.assignment().size()); + + consumer.seek(TP, 0); + var records = consumeRecords(consumer, numRecords); + assertEquals(numRecords, records.size()); + var header = records.get(0).headers().lastHeader("headerKey"); + assertEquals("headerValue", header == null ? null : new String(header.value())); + } + } + + @ClusterTest + public void testClassicConsumerHeadersSerializerDeserializer() throws Exception { + testHeadersSerializeDeserialize(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerHeadersSerializerDeserializer() throws Exception { + testHeadersSerializeDeserialize(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testHeadersSerializeDeserialize(Map config) throws InterruptedException { + var numRecords = 1; + Map consumerConfig = new HashMap<>(config); + consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerImpl.class); + Map producerConfig = Map.of( + VALUE_SERIALIZER_CLASS_CONFIG, SerializerImpl.class.getName() + ); + + try (Producer producer = cluster.producer(producerConfig); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + producer.send(new ProducerRecord<>( + TP.topic(), + TP.partition(), + null, + "key".getBytes(), + "value".getBytes()) + ); + + assertEquals(0, consumer.assignment().size()); + consumer.assign(List.of(TP)); + assertEquals(1, consumer.assignment().size()); + + consumer.seek(TP, 0); + assertEquals(numRecords, consumeRecords(consumer, numRecords).size()); + } + } + + @ClusterTest + public void testClassicConsumerAutoOffsetReset() throws Exception { + testAutoOffsetReset(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerAutoOffsetReset() throws Exception { + testAutoOffsetReset(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testAutoOffsetReset(Map consumerConfig) throws Exception { + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 1, startingTimestamp); + consumer.assign(List.of(TP)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerGroupConsumption() throws Exception { + testGroupConsumption(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerGroupConsumption() throws Exception { + testGroupConsumption(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testGroupConsumption(Map consumerConfig) throws Exception { + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 10, startingTimestamp); + consumer.subscribe(List.of(TOPIC)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsFor() throws Exception { + testPartitionsFor(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionsFor() throws Exception { + testPartitionsFor(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionsFor(Map consumerConfig) throws Exception { + var numParts = 2; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + cluster.createTopic("part-test", numParts, (short) 1); + + try (var consumer = cluster.consumer(consumerConfig)) { + var partitions = consumer.partitionsFor(TOPIC); + assertNotNull(partitions); + assertEquals(2, partitions.size()); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsForAutoCreate() throws Exception { + testPartitionsForAutoCreate(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionsForAutoCreate() throws Exception { + testPartitionsForAutoCreate(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionsForAutoCreate(Map consumerConfig) throws Exception { + try (var consumer = cluster.consumer(consumerConfig)) { + // First call would create the topic + consumer.partitionsFor("non-exist-topic"); + TestUtils.waitForCondition( + () -> !consumer.partitionsFor("non-exist-topic").isEmpty(), + "Timed out while awaiting non empty partitions." + ); + } + } + + @ClusterTest + public void testClassicConsumerPartitionsForInvalidTopic() { + testPartitionsForInvalidTopic(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionsForInvalidTopic() { + testPartitionsForInvalidTopic(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionsForInvalidTopic(Map consumerConfig) { + try (var consumer = cluster.consumer(consumerConfig)) { + assertThrows(InvalidTopicException.class, () -> consumer.partitionsFor(";3# ads,{234")); + } + } + + @ClusterTest + public void testClassicConsumerSeek() throws Exception { + testSeek( + Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerSeek() throws Exception { + testSeek( + Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testSeek(Map consumerConfig) throws Exception { + var totalRecords = 50; + var mid = totalRecords / 2; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = 0; + sendRecords(producer, TP, totalRecords, startingTimestamp); + + consumer.assign(List.of(TP)); + consumer.seekToEnd(List.of(TP)); + assertEquals(totalRecords, consumer.position(TP)); + assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty()); + + consumer.seekToBeginning(List.of(TP)); + assertEquals(0, consumer.position(TP)); + consumeAndVerifyRecords(consumer, TP, 1, 0, 0, startingTimestamp); + + consumer.seek(TP, mid); + assertEquals(mid, consumer.position(TP)); + + consumeAndVerifyRecords(consumer, TP, 1, mid, mid, mid); + + // Test seek compressed message + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + sendCompressedMessages(totalRecords, tp2); + consumer.assign(List.of(tp2)); + + consumer.seekToEnd(List.of(tp2)); + assertEquals(totalRecords, consumer.position(tp2)); + assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty()); + + consumer.seekToBeginning(List.of(tp2)); + assertEquals(0L, consumer.position(tp2)); + consumeAndVerifyRecords(consumer, tp2, 1, 0); + + consumer.seek(tp2, mid); + assertEquals(mid, consumer.position(tp2)); + consumeAndVerifyRecords(consumer, tp2, 1, mid, mid, mid); + } + } + + @ClusterTest + public void testClassicConsumerPartitionPauseAndResume() throws Exception { + testPartitionPauseAndResume(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPartitionPauseAndResume() throws Exception { + testPartitionPauseAndResume(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPartitionPauseAndResume(Map consumerConfig) throws Exception { + var partitions = List.of(TP); + var numRecords = 5; + + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + + consumer.assign(partitions); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + consumer.pause(partitions); + startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty()); + consumer.resume(partitions); + consumeAndVerifyRecords(consumer, TP, numRecords, 5, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerInterceptors() throws Exception { + testInterceptors(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerInterceptors() throws Exception { + testInterceptors(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testInterceptors(Map consumerConfig) throws Exception { + var appendStr = "mock"; + MockConsumerInterceptor.resetCounters(); + MockProducerInterceptor.resetCounters(); + + // create producer with interceptor + Map producerConfig = Map.of( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName(), + KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + "mock.interceptor.append", appendStr + ); + // create consumer with interceptor + Map consumerConfigOverride = new HashMap<>(consumerConfig); + consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + consumerConfigOverride.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerConfigOverride.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + try (Producer producer = cluster.producer(producerConfig); + Consumer consumer = cluster.consumer(consumerConfigOverride) + ) { + // produce records + var numRecords = 10; + List> futures = new ArrayList<>(); + for (var i = 0; i < numRecords; i++) { + Future future = producer.send( + new ProducerRecord<>(TP.topic(), TP.partition(), "key " + i, "value " + i) + ); + futures.add(future); + } + + // Wait for all sends to complete + futures.forEach(future -> assertDoesNotThrow(() -> future.get())); + + assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue()); + assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue()); + + // send invalid record + assertThrows( + Throwable.class, + () -> producer.send(null), + "Should not allow sending a null record" + ); + assertEquals( + 1, + MockProducerInterceptor.ON_ERROR_COUNT.intValue(), + "Interceptor should be notified about exception" + ); + assertEquals( + 0, + MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(), + "Interceptor should not receive metadata with an exception when record is null" + ); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + + // consume and verify that values are modified by interceptors + var records = consumeRecords(consumer, numRecords); + for (var i = 0; i < numRecords; i++) { + ConsumerRecord record = records.get(i); + assertEquals("key " + i, record.key()); + assertEquals(("value " + i + appendStr).toUpperCase(Locale.ROOT), record.value()); + } + + // commit sync and verify onCommit is called + var commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue(); + consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L))); + assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset()); + assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()); + + // commit async and verify onCommit is called + var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L)); + sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit)); + assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset()); + assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()); + } + // cleanup + MockConsumerInterceptor.resetCounters(); + MockProducerInterceptor.resetCounters(); + } + + @ClusterTest + public void testClassicConsumerInterceptorsWithWrongKeyValue() throws Exception { + testInterceptorsWithWrongKeyValue(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerInterceptorsWithWrongKeyValue() throws Exception { + testInterceptorsWithWrongKeyValue(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testInterceptorsWithWrongKeyValue(Map consumerConfig) throws Exception { + var appendStr = "mock"; + // create producer with interceptor that has different key and value types from the producer + Map producerConfig = Map.of( + ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MockProducerInterceptor.class.getName(), + "mock.interceptor.append", appendStr + ); + // create consumer with interceptor that has different key and value types from the consumer + Map consumerConfigOverride = new HashMap<>(consumerConfig); + consumerConfigOverride.put(INTERCEPTOR_CLASSES_CONFIG, MockConsumerInterceptor.class.getName()); + + try (Producer producer = cluster.producer(producerConfig); + Consumer consumer = cluster.consumer(consumerConfigOverride) + ) { + // producing records should succeed + producer.send(new ProducerRecord<>( + TP.topic(), + TP.partition(), + "key".getBytes(), + "value will not be modified".getBytes() + )); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + // consume and verify that values are not modified by interceptors -- their exceptions are caught and logged, but not propagated + var records = consumeRecords(consumer, 1); + var record = records.get(0); + assertEquals("value will not be modified", new String(record.value())); + } + } + + @ClusterTest + public void testClassicConsumerConsumeMessagesWithCreateTime() throws Exception { + testConsumeMessagesWithCreateTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerConsumeMessagesWithCreateTime() throws Exception { + testConsumeMessagesWithCreateTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testConsumeMessagesWithCreateTime(Map consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + var numRecords = 50; + var tp2 = new TopicPartition(TOPIC, 1); + + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + // Test non-compressed messages + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + consumer.assign(List.of(TP)); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + + // Test compressed messages + sendCompressedMessages(numRecords, tp2); + consumer.assign(List.of(tp2)); + consumeAndVerifyRecords(consumer, tp2, numRecords, 0, 0, 0); + } + } + + @ClusterTest + public void testClassicConsumerConsumeMessagesWithLogAppendTime() throws Exception { + testConsumeMessagesWithLogAppendTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerConsumeMessagesWithLogAppendTime() throws Exception { + testConsumeMessagesWithLogAppendTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testConsumeMessagesWithLogAppendTime(Map consumerConfig) throws Exception { + var topicName = "testConsumeMessagesWithLogAppendTime"; + var startTime = System.currentTimeMillis(); + var numRecords = 50; + cluster.createTopic(topicName, 2, (short) 2, Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime")); + + try (Consumer consumer = cluster.consumer(consumerConfig)) { + // Test non-compressed messages + var tp1 = new TopicPartition(topicName, 0); + sendRecords(cluster, tp1, numRecords); + consumer.assign(List.of(tp1)); + consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp1, numRecords, startTime); + + // Test compressed messages + var tp2 = new TopicPartition(topicName, 1); + sendCompressedMessages(numRecords, tp2); + consumer.assign(List.of(tp2)); + consumeAndVerifyRecordsWithTimeTypeLogAppend(consumer, tp2, numRecords, startTime); + } + } + + @ClusterTest + public void testClassicConsumerListTopics() throws Exception { + testListTopics(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerListTopics() throws Exception { + testListTopics(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testListTopics(Map consumerConfig) throws Exception { + var numParts = 2; + var topic1 = "part-test-topic-1"; + var topic2 = "part-test-topic-2"; + var topic3 = "part-test-topic-3"; + cluster.createTopic(topic1, numParts, (short) 1); + cluster.createTopic(topic2, numParts, (short) 1); + cluster.createTopic(topic3, numParts, (short) 1); + + sendRecords(cluster, new TopicPartition(topic1, 0), 1); + + try (var consumer = cluster.consumer(consumerConfig)) { + // consumer some messages, and we can list the internal topic __consumer_offsets + consumer.subscribe(List.of(topic1)); + consumer.poll(Duration.ofMillis(100)); + var topics = consumer.listTopics(); + assertNotNull(topics); + assertEquals(4, topics.size()); + assertEquals(2, topics.get(topic1).size()); + assertEquals(2, topics.get(topic2).size()); + assertEquals(2, topics.get(topic3).size()); + } + } + + @ClusterTest + public void testClassicConsumerPauseStateNotPreservedByRebalance() throws Exception { + testPauseStateNotPreservedByRebalance(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + SESSION_TIMEOUT_MS_CONFIG, 100, + HEARTBEAT_INTERVAL_MS_CONFIG, 30 + )); + } + + @ClusterTest + public void testAsyncConsumerPauseStateNotPreservedByRebalance() throws Exception { + testPauseStateNotPreservedByRebalance(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPauseStateNotPreservedByRebalance(Map consumerConfig) throws Exception { + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, 5, startingTimestamp); + consumer.subscribe(List.of(TOPIC)); + consumeAndVerifyRecords(consumer, TP, 5, 0, 0, startingTimestamp); + consumer.pause(List.of(TP)); + + // subscribe to a new topic to trigger a rebalance + consumer.subscribe(List.of("topic2")); + + // after rebalance, our position should be reset and our pause state lost, + // so we should be able to consume from the beginning + consumeAndVerifyRecords(consumer, TP, 0, 5, 0, startingTimestamp); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLeadMetricsCleanUpWithSubscribe"; + testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLeadMetricsCleanUpWithSubscribe"; + testPerPartitionLeadMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLeadMetricsCleanUpWithSubscribe( + Map consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var topic2 = "topic2"; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + try (Consumer consumer = cluster.consumer(consumerConfig)) { + // send some messages. + sendRecords(cluster, TP, numMessages); + + // Test subscribe + // Create a consumer and consumer some messages. + var listener = new TestConsumerReassignmentListener(); + consumer.subscribe(List.of(TOPIC, topic2), listener); + var records = awaitNonEmptyRecords(consumer, TP); + assertEquals(1, listener.callsToAssigned, "should be assigned once"); + + // Verify the metric exist. + Map tags1 = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + Map tags2 = Map.of( + "client-id", consumerClientId, + "topic", tp2.topic(), + "partition", String.valueOf(tp2.partition()) + ); + + var fetchLead0 = consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)); + assertNotNull(fetchLead0); + assertEquals((double) records.count(), fetchLead0.metricValue(), "The lead should be " + records.count()); + + // Remove topic from subscription + consumer.subscribe(List.of(topic2), listener); + awaitRebalance(consumer, listener); + + // Verify the metric has gone + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))); + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLagMetricsCleanUpWithSubscribe"; + testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLagMetricsCleanUpWithSubscribe"; + testPerPartitionLagMetricsCleanUpWithSubscribe(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLagMetricsCleanUpWithSubscribe( + Map consumerConfig, + String consumerClientId + ) throws Exception { + int numMessages = 1000; + var topic2 = "topic2"; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + try (Consumer consumer = cluster.consumer(consumerConfig)) { + // send some messages. + sendRecords(cluster, TP, numMessages); + + // Test subscribe + // Create a consumer and consumer some messages. + var listener = new TestConsumerReassignmentListener(); + consumer.subscribe(List.of(TOPIC, topic2), listener); + var records = awaitNonEmptyRecords(consumer, TP); + assertEquals(1, listener.callsToAssigned, "should be assigned once"); + + // Verify the metric exist. + Map tags1 = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + Map tags2 = Map.of( + "client-id", consumerClientId, + "topic", tp2.topic(), + "partition", String.valueOf(tp2.partition()) + ); + + var fetchLag0 = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)); + assertNotNull(fetchLag0); + var expectedLag = numMessages - records.count(); + assertEquals(expectedLag, (double) fetchLag0.metricValue(), EPSILON, "The lag should be " + expectedLag); + + // Remove topic from subscription + consumer.subscribe(List.of(topic2), listener); + awaitRebalance(consumer, listener); + + // Verify the metric has gone + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))); + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLeadMetricsCleanUpWithAssign"; + testPerPartitionLeadMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLeadMetricsCleanUpWithAssign"; + testPerPartitionLeadMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLeadMetricsCleanUpWithAssign( + Map consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + var records = awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map tags = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + var fetchLead = consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLead); + assertEquals((double) records.count(), fetchLead.metricValue(), "The lead should be " + records.count()); + + consumer.assign(List.of(tp2)); + awaitNonEmptyRecords(consumer, tp2); + assertNull(consumer.metrics().get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLagMetricsCleanUpWithAssign"; + testPerPartitionLagMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLagMetricsCleanUpWithAssign"; + testPerPartitionLagMetricsCleanUpWithAssign(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId + ), consumerClientId); + } + + private void testPerPartitionLagMetricsCleanUpWithAssign( + Map consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + var records = awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map tags = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + var fetchLag = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLag); + + var expectedLag = numMessages - records.count(); + assertEquals(expectedLag, (double) fetchLag.metricValue(), EPSILON, "The lag should be " + expectedLag); + consumer.assign(List.of(tp2)); + awaitNonEmptyRecords(consumer, tp2); + assertNull(consumer.metrics().get(new MetricName(TP + ".records-lag", "consumer-fetch-manager-metrics", "", tags))); + assertNull(consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))); + } + } + + @ClusterTest + public void testClassicConsumerPerPartitionLagMetricsWhenReadCommitted() throws Exception { + String consumerClientId = "testClassicConsumerPerPartitionLagMetricsWhenReadCommitted"; + testPerPartitionLagMetricsWhenReadCommitted(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted() throws Exception { + String consumerClientId = "testAsyncConsumerPerPartitionLagMetricsWhenReadCommitted"; + testPerPartitionLagMetricsWhenReadCommitted(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + private void testPerPartitionLagMetricsWhenReadCommitted( + Map consumerConfig, + String consumerClientId + ) throws Exception { + var numMessages = 1000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + // Test assign send some messages. + sendRecords(producer, TP, numMessages, System.currentTimeMillis()); + sendRecords(producer, tp2, numMessages, System.currentTimeMillis()); + + consumer.assign(List.of(TP)); + awaitNonEmptyRecords(consumer, TP); + + // Verify the metric exist. + Map tags = Map.of( + "client-id", consumerClientId, + "topic", TP.topic(), + "partition", String.valueOf(TP.partition()) + ); + + var fetchLag = consumer.metrics().get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)); + assertNotNull(fetchLag); + } + } + + @ClusterTest + public void testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws Exception { + var consumerClientId = "testClassicConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured"; + testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + @ClusterTest + public void testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured() throws Exception { + var consumerClientId = "testAsyncConsumerQuotaMetricsNotCreatedIfNoQuotasConfigured"; + testQuotaMetricsNotCreatedIfNoQuotasConfigured(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, consumerClientId, + CLIENT_ID_CONFIG, consumerClientId, + ISOLATION_LEVEL_CONFIG, "read_committed" + ), consumerClientId); + } + + private void testQuotaMetricsNotCreatedIfNoQuotasConfigured( + Map consumerConfig, + String consumerClientId + ) throws Exception { + var producerClientId = UUID.randomUUID().toString(); + var numRecords = 1000; + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer producer = cluster.producer(Map.of(ProducerConfig.CLIENT_ID_CONFIG, producerClientId)); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + sendRecords(producer, TP, numRecords, startingTimestamp); + + consumer.assign(List.of(TP)); + consumer.seek(TP, 0); + consumeAndVerifyRecords(consumer, TP, numRecords, 0, 0, startingTimestamp); + + var brokers = cluster.brokers().values(); + brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", QuotaType.PRODUCE, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.PRODUCE, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "byte-rate", QuotaType.FETCH, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.FETCH, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "request-time", QuotaType.REQUEST, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.REQUEST, producerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "request-time", QuotaType.REQUEST, consumerClientId)); + brokers.forEach(broker -> assertNoMetric(broker, "throttle-time", QuotaType.REQUEST, consumerClientId)); + } + } + + private void assertNoMetric(KafkaBroker broker, String name, QuotaType quotaType, String clientId) { + var metricName = broker.metrics().metricName(name, quotaType.toString(), "", "user", "", "client-id", clientId); + assertNull(broker.metrics().metric(metricName), "Metric should not have been created " + metricName); + } + + @ClusterTest + public void testClassicConsumerSeekThrowsIllegalStateIfPartitionsNotAssigned() throws Exception { + testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerSeekThrowsIllegalStateIfPartitionsNotAssigned() throws Exception { + testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testSeekThrowsIllegalStateIfPartitionsNotAssigned(Map consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + try (var consumer = cluster.consumer(consumerConfig)) { + var e = assertThrows(IllegalStateException.class, () -> consumer.seekToEnd(List.of(TP))); + assertEquals("No current assignment for partition " + TP, e.getMessage()); + } + } + + @ClusterTest + public void testClassicConsumingWithNullGroupId() throws Exception { + testConsumingWithNullGroupId(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers() + )); + } + + @ClusterTest + public void testAsyncConsumerConsumingWithNullGroupId() throws Exception { + testConsumingWithNullGroupId(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers() + )); + } + + private void testConsumingWithNullGroupId(Map consumerConfig) throws Exception { + var partition = 0; + cluster.createTopic(TOPIC, 1, (short) 1); + + // consumer 1 uses the default group id and consumes from earliest offset + Map consumer1Config = new HashMap<>(consumerConfig); + consumer1Config.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumer1Config.put(CLIENT_ID_CONFIG, "consumer1"); + + // consumer 2 uses the default group id and consumes from latest offset + Map consumer2Config = new HashMap<>(consumerConfig); + consumer2Config.put(AUTO_OFFSET_RESET_CONFIG, "latest"); + consumer2Config.put(CLIENT_ID_CONFIG, "consumer2"); + + // consumer 3 uses the default group id and starts from an explicit offset + Map consumer3Config = new HashMap<>(consumerConfig); + consumer3Config.put(CLIENT_ID_CONFIG, "consumer3"); + + try (Producer producer = cluster.producer(); + Consumer consumer1 = new KafkaConsumer<>(consumer1Config); + Consumer consumer2 = new KafkaConsumer<>(consumer2Config); + Consumer consumer3 = new KafkaConsumer<>(consumer3Config) + ) { + producer.send(new ProducerRecord<>(TOPIC, partition, "k1".getBytes(), "v1".getBytes())).get(); + producer.send(new ProducerRecord<>(TOPIC, partition, "k2".getBytes(), "v2".getBytes())).get(); + producer.send(new ProducerRecord<>(TOPIC, partition, "k3".getBytes(), "v3".getBytes())).get(); + + consumer1.assign(List.of(TP)); + consumer2.assign(List.of(TP)); + consumer3.assign(List.of(TP)); + consumer3.seek(TP, 1); + + var numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count(); + assertThrows(InvalidGroupIdException.class, consumer1::commitSync); + assertThrows(InvalidGroupIdException.class, () -> consumer2.committed(Set.of(TP))); + + var numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count(); + var numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count(); + + consumer1.unsubscribe(); + consumer2.unsubscribe(); + consumer3.unsubscribe(); + + assertTrue(consumer1.assignment().isEmpty()); + assertTrue(consumer2.assignment().isEmpty()); + assertTrue(consumer3.assignment().isEmpty()); + + consumer1.close(); + consumer2.close(); + consumer3.close(); + + assertEquals(3, numRecords1, "Expected consumer1 to consume from earliest offset"); + assertEquals(0, numRecords2, "Expected consumer2 to consume from latest offset"); + assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1"); + } + } + + @ClusterTest + public void testClassicConsumerNullGroupIdNotSupportedIfCommitting() throws Exception { + testNullGroupIdNotSupportedIfCommitting(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(), + AUTO_OFFSET_RESET_CONFIG, "earliest", + CLIENT_ID_CONFIG, "consumer1" + )); + } + + @ClusterTest + public void testAsyncConsumerNullGroupIdNotSupportedIfCommitting() throws Exception { + testNullGroupIdNotSupportedIfCommitting(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers(), + AUTO_OFFSET_RESET_CONFIG, "earliest", + CLIENT_ID_CONFIG, "consumer1" + )); + } + + private void testNullGroupIdNotSupportedIfCommitting(Map consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + try (var consumer = new KafkaConsumer<>(consumerConfig)) { + consumer.assign(List.of(TP)); + assertThrows(InvalidGroupIdException.class, consumer::commitSync); + } + } + + @ClusterTest + public void testClassicConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() throws Exception { + testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "my-group-id", + GROUP_INSTANCE_ID_CONFIG, "my-instance-id", + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + @ClusterTest + public void testAsyncConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() throws Exception { + testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + GROUP_ID_CONFIG, "my-group-id", + GROUP_INSTANCE_ID_CONFIG, "my-instance-id", + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + private void testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map consumerConfig) throws Exception { + var foo = "foo"; + var foo0 = new TopicPartition(foo, 0); + var foo1 = new TopicPartition(foo, 1); + cluster.createTopic(foo, 1, (short) 1); + + try (Consumer consumer1 = cluster.consumer(consumerConfig); + Consumer consumer2 = cluster.consumer(consumerConfig); + var admin = cluster.admin() + ) { + consumer1.subscribe(List.of(foo)); + awaitAssignment(consumer1, Set.of(foo0)); + consumer1.close(); + + consumer2.subscribe(List.of(foo)); + awaitAssignment(consumer2, Set.of(foo0)); + + admin.createPartitions(Map.of(foo, NewPartitions.increaseTo(2))).all().get(); + awaitAssignment(consumer2, Set.of(foo0, foo1)); + } + } + + @ClusterTest + public void testClassicConsumerEndOffsets() throws Exception { + testEndOffsets(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + @ClusterTest + public void testAsyncConsumerEndOffsets() throws Exception { + testEndOffsets(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + METADATA_MAX_AGE_CONFIG, 100, + MAX_POLL_INTERVAL_MS_CONFIG, 6000 + )); + } + + private void testEndOffsets(Map consumerConfig) throws Exception { + var numRecords = 10000; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + var startingTimestamp = System.currentTimeMillis(); + for (var i = 0; i < numRecords; i++) { + var timestamp = startingTimestamp + (long) i; + ProducerRecord record = new ProducerRecord<>( + TP.topic(), + TP.partition(), + timestamp, + ("key " + i).getBytes(), + ("value " + i).getBytes() + ); + producer.send(record); + } + producer.flush(); + + consumer.subscribe(List.of(TOPIC)); + awaitAssignment(consumer, Set.of(TP, tp2)); + + var endOffsets = consumer.endOffsets(Set.of(TP)); + assertEquals(numRecords, endOffsets.get(TP)); + } + } + + @ClusterTest + public void testClassicConsumerFetchOffsetsForTime() throws Exception { + testFetchOffsetsForTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerFetchOffsetsForTime() throws Exception { + testFetchOffsetsForTime(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testFetchOffsetsForTime(Map consumerConfig) throws Exception { + var numPartitions = 2; + var tp2 = new TopicPartition(TOPIC, 1); + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(consumerConfig) + ) { + Map timestampsToSearch = new HashMap<>(); + for (int part = 0, i = 0; part < numPartitions; part++, i++) { + var tp = new TopicPartition(TOPIC, part); + // key, val, and timestamp equal to the sequence number. + sendRecords(producer, tp, 100, 0); + timestampsToSearch.put(tp, i * 20L); + } + // Test negative target time + assertThrows(IllegalArgumentException.class, () -> consumer.offsetsForTimes(Map.of(TP, -1L))); + var timestampOffsets = consumer.offsetsForTimes(timestampsToSearch); + + var timestampTp0 = timestampOffsets.get(TP); + assertEquals(0, timestampTp0.offset()); + assertEquals(0, timestampTp0.timestamp()); + assertEquals(Optional.of(0), timestampTp0.leaderEpoch()); + + var timestampTp1 = timestampOffsets.get(tp2); + assertEquals(20, timestampTp1.offset()); + assertEquals(20, timestampTp1.timestamp()); + assertEquals(Optional.of(0), timestampTp1.leaderEpoch()); + } + } + + @ClusterTest + public void testClassicConsumerPositionRespectsTimeout() { + testPositionRespectsTimeout(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPositionRespectsTimeout() { + testPositionRespectsTimeout(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPositionRespectsTimeout(Map consumerConfig) { + var topicPartition = new TopicPartition(TOPIC, 15); + try (var consumer = cluster.consumer(consumerConfig)) { + consumer.assign(List.of(topicPartition)); + // When position() is called for a topic/partition that doesn't exist, the consumer will repeatedly update the + // local metadata. However, it should give up after the user-supplied timeout has past. + assertThrows(TimeoutException.class, () -> consumer.position(topicPartition, Duration.ofSeconds(3))); + } + } + + @ClusterTest + public void testClassicConsumerPositionRespectsWakeup() { + testPositionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerPositionRespectsWakeup() { + testPositionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testPositionRespectsWakeup(Map consumerConfig) { + var topicPartition = new TopicPartition(TOPIC, 15); + try (var consumer = cluster.consumer(consumerConfig)) { + consumer.assign(List.of(topicPartition)); + CompletableFuture.runAsync(() -> { + try { + TimeUnit.SECONDS.sleep(1); + consumer.wakeup(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + assertThrows(WakeupException.class, () -> consumer.position(topicPartition, Duration.ofSeconds(3))); + } + } + + @ClusterTest + public void testClassicConsumerPositionWithErrorConnectionRespectsWakeup() { + testPositionWithErrorConnectionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + // make sure the connection fails + BOOTSTRAP_SERVERS_CONFIG, "localhost:12345" + )); + } + + @ClusterTest + public void testAsyncConsumerPositionWithErrorConnectionRespectsWakeup() { + testPositionWithErrorConnectionRespectsWakeup(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + // make sure the connection fails + BOOTSTRAP_SERVERS_CONFIG, "localhost:12345" + )); + } + + private void testPositionWithErrorConnectionRespectsWakeup(Map consumerConfig) { + var topicPartition = new TopicPartition(TOPIC, 15); + try (var consumer = cluster.consumer(consumerConfig)) { + consumer.assign(List.of(topicPartition)); + CompletableFuture.runAsync(() -> { + try { + TimeUnit.SECONDS.sleep(1); + consumer.wakeup(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + assertThrows(WakeupException.class, () -> consumer.position(topicPartition, Duration.ofSeconds(100))); + } + } + + @Flaky("KAFKA-18031") + @ClusterTest + public void testClassicConsumerCloseLeavesGroupOnInterrupt() throws Exception { + testCloseLeavesGroupOnInterrupt(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + AUTO_OFFSET_RESET_CONFIG, "earliest", + GROUP_ID_CONFIG, "group_test,", + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers() + )); + } + + @Flaky("KAFKA-18031") + @ClusterTest + public void testAsyncConsumerCloseLeavesGroupOnInterrupt() throws Exception { + testCloseLeavesGroupOnInterrupt(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT), + KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(), + AUTO_OFFSET_RESET_CONFIG, "earliest", + GROUP_ID_CONFIG, "group_test,", + BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers() + )); + } + + private void testCloseLeavesGroupOnInterrupt(Map consumerConfig) throws Exception { + try (Consumer consumer = cluster.consumer(consumerConfig)) { + var listener = new TestConsumerReassignmentListener(); + consumer.subscribe(List.of(TOPIC), listener); + awaitRebalance(consumer, listener); + + assertEquals(1, listener.callsToAssigned); + assertEquals(0, listener.callsToRevoked); + + try { + Thread.currentThread().interrupt(); + assertThrows(InterruptException.class, consumer::close); + } finally { + // Clear the interrupted flag so we don't create problems for subsequent tests. + Thread.interrupted(); + } + + assertEquals(1, listener.callsToAssigned); + assertEquals(1, listener.callsToRevoked); + + Map consumerConfigMap = new HashMap<>(consumerConfig); + var config = new ConsumerConfig(consumerConfigMap); + + // Set the wait timeout to be only *half* the configured session timeout. This way we can make sure that the + // consumer explicitly left the group as opposed to being kicked out by the broker. + var leaveGroupTimeoutMs = config.getInt(SESSION_TIMEOUT_MS_CONFIG) / 2; + + TestUtils.waitForCondition( + () -> checkGroupMemberEmpty(config), + leaveGroupTimeoutMs, + "Consumer did not leave the consumer group within " + leaveGroupTimeoutMs + " ms of close" + ); + } + } + + private boolean checkGroupMemberEmpty(ConsumerConfig config) { + try (var admin = cluster.admin()) { + var groupId = config.getString(GROUP_ID_CONFIG); + var result = admin.describeConsumerGroups(List.of(groupId)); + var groupDescription = result.describedGroups().get(groupId).get(); + return groupDescription.members().isEmpty(); + } catch (ExecutionException | InterruptedException e) { + return false; + } + } + + @ClusterTest + public void testClassicConsumerOffsetRelatedWhenTimeoutZero() throws Exception { + testOffsetRelatedWhenTimeoutZero(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT) + )); + } + + @ClusterTest + public void testAsyncConsumerOffsetRelatedWhenTimeoutZero() throws Exception { + testOffsetRelatedWhenTimeoutZero(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT) + )); + } + + private void testOffsetRelatedWhenTimeoutZero(Map consumerConfig) throws Exception { + cluster.createTopic(TOPIC, 2, (short) BROKER_COUNT); + try (var consumer = cluster.consumer(consumerConfig)) { + var result1 = consumer.beginningOffsets(List.of(TP), Duration.ZERO); + assertNotNull(result1); + assertEquals(0, result1.size()); + + var result2 = consumer.endOffsets(List.of(TP), Duration.ZERO); + assertNotNull(result2); + assertEquals(0, result2.size()); + + var result3 = consumer.offsetsForTimes(Map.of(TP, 0L), Duration.ZERO); + assertNotNull(result3); + assertEquals(1, result3.size()); + assertNull(result3.get(TP)); + } + } + + private void sendCompressedMessages(int numRecords, TopicPartition tp) { + Map config = Map.of( + COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name, + LINGER_MS_CONFIG, Integer.MAX_VALUE + ); + try (Producer producer = cluster.producer(config)) { + IntStream.range(0, numRecords).forEach(i -> producer.send(new ProducerRecord<>( + tp.topic(), + tp.partition(), + (long) i, + ("key " + i).getBytes(), + ("value " + i).getBytes() + ))); + } + } + + private ConsumerRecords awaitNonEmptyRecords( + Consumer consumer, + TopicPartition tp + ) throws Exception { + AtomicReference> result = new AtomicReference<>(); + + TestUtils.waitForCondition(() -> { + var polledRecords = consumer.poll(Duration.ofSeconds(10)); + boolean hasRecords = !polledRecords.isEmpty(); + if (hasRecords) { + result.set(polledRecords); + } + return hasRecords; + }, "Timed out waiting for non-empty records from topic " + tp.topic() + " partition " + tp.partition()); + + return result.get(); + } + + public static class SerializerImpl implements Serializer { + private final ByteArraySerializer serializer = new ByteArraySerializer(); + + @Override + public byte[] serialize(String topic, byte[] data) { + throw new RuntimeException("This method should not be called"); + } + + @Override + public byte[] serialize(String topic, Headers headers, byte[] data) { + headers.add("content-type", "application/octet-stream".getBytes()); + return serializer.serialize(topic, headers, data); + } + } + + public static class DeserializerImpl implements Deserializer { + private final ByteArrayDeserializer deserializer = new ByteArrayDeserializer(); + + @Override + public byte[] deserialize(String topic, byte[] data) { + throw new RuntimeException("This method should not be called"); + } + + @Override + public byte[] deserialize(String topic, Headers headers, byte[] data) { + Header contentType = headers.lastHeader("content-type"); + assertNotNull(contentType); + assertEquals("application/octet-stream", new String(contentType.value())); + return deserializer.deserialize(topic, headers, data); + } + } +} diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 2d5e1a2c631..adfb657b776 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -19,10 +19,9 @@ package kafka.api import kafka.utils.TestInfoUtils import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, GroupProtocol} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} -import org.apache.kafka.common.header.Headers import org.apache.kafka.common.{ClusterResource, ClusterResourceListener, PartitionInfo} import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer} +import org.apache.kafka.common.serialization.{Deserializer, Serializer} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -130,41 +129,4 @@ object BaseConsumerTest { override def onUpdate(clusterResource: ClusterResource): Unit = updateConsumerCount.incrementAndGet() override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = data } - - class SerializerImpl extends Serializer[Array[Byte]] { - var serializer = new ByteArraySerializer() - - override def serialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = { - headers.add("content-type", "application/octet-stream".getBytes) - serializer.serialize(topic, data) - } - - override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = serializer.configure(configs, isKey) - - override def close(): Unit = serializer.close() - - override def serialize(topic: String, data: Array[Byte]): Array[Byte] = { - fail("method should not be invoked") - null - } - } - - class DeserializerImpl extends Deserializer[Array[Byte]] { - var deserializer = new ByteArrayDeserializer() - - override def deserialize(topic: String, headers: Headers, data: Array[Byte]): Array[Byte] = { - val header = headers.lastHeader("content-type") - assertEquals("application/octet-stream", if (header == null) null else new String(header.value())) - deserializer.deserialize(topic, data) - } - - override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = deserializer.configure(configs, isKey) - - override def close(): Unit = deserializer.close() - - override def deserialize(topic: String, data: Array[Byte]): Array[Byte] = { - fail("method should not be invoked") - null - } - } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index c9d33354c5d..fc60cab1d0d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -12,820 +12,21 @@ */ package kafka.api -import kafka.api.BaseConsumerTest.{DeserializerImpl, SerializerImpl} - -import java.time.Duration import java.util -import java.util.{Locale, Optional, Properties} -import kafka.server.KafkaBroker import kafka.utils.{TestInfoUtils, TestUtils} -import org.apache.kafka.clients.admin.{NewPartitions, NewTopic} import org.apache.kafka.clients.consumer._ -import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} -import org.apache.kafka.common.config.TopicConfig -import org.apache.kafka.common.errors.{InterruptException, InvalidGroupIdException, InvalidTopicException, TimeoutException, WakeupException} -import org.apache.kafka.common.record.{CompressionType, TimestampType} -import org.apache.kafka.common.serialization._ +import org.apache.kafka.common.errors.InterruptException import org.apache.kafka.common.test.api.Flaky -import org.apache.kafka.common.{MetricName, TopicPartition} -import org.apache.kafka.server.quota.QuotaType -import org.apache.kafka.test.{MockConsumerInterceptor, MockProducerInterceptor} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Timeout import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource -import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit} +import java.util.concurrent.ExecutionException @Timeout(600) class PlaintextConsumerTest extends BaseConsumerTest { - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testHeaders(groupProtocol: String): Unit = { - val numRecords = 1 - val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes) - - record.headers().add("headerKey", "headerValue".getBytes) - - val producer = createProducer() - producer.send(record) - - val consumer = createConsumer() - assertEquals(0, consumer.assignment.size) - consumer.assign(java.util.List.of(tp)) - assertEquals(1, consumer.assignment.size) - - consumer.seek(tp, 0) - val records = consumeRecords(consumer = consumer, numRecords = numRecords) - - assertEquals(numRecords, records.size) - - for (i <- 0 until numRecords) { - val record = records(i) - val header = record.headers().lastHeader("headerKey") - assertEquals("headerValue", if (header == null) null else new String(header.value())) - } - } - - private def testHeadersSerializeDeserialize(serializer: Serializer[Array[Byte]], deserializer: Deserializer[Array[Byte]]): Unit = { - val numRecords = 1 - val record = new ProducerRecord(tp.topic, tp.partition, null, "key".getBytes, "value".getBytes) - - val producer = createProducer( - keySerializer = new ByteArraySerializer, - valueSerializer = serializer) - producer.send(record) - - val consumer = createConsumer( - keyDeserializer = new ByteArrayDeserializer, - valueDeserializer = deserializer) - assertEquals(0, consumer.assignment.size) - consumer.assign(java.util.List.of(tp)) - assertEquals(1, consumer.assignment.size) - - consumer.seek(tp, 0) - val records = consumeRecords(consumer = consumer, numRecords = numRecords) - - assertEquals(numRecords, records.size) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testHeadersSerializerDeserializer(groupProtocol: String): Unit = { - val extendedSerializer = new SerializerImpl - - val extendedDeserializer = new DeserializerImpl - - testHeadersSerializeDeserialize(extendedSerializer, extendedDeserializer) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testAutoOffsetReset(groupProtocol: String): Unit = { - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords = 1, tp, startingTimestamp = startingTimestamp) - - val consumer = createConsumer() - consumer.assign(java.util.List.of(tp)) - consumeAndVerifyRecords(consumer = consumer, numRecords = 1, startingOffset = 0, startingTimestamp = startingTimestamp) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testGroupConsumption(groupProtocol: String): Unit = { - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords = 10, tp, startingTimestamp = startingTimestamp) - - val consumer = createConsumer() - consumer.subscribe(java.util.List.of(topic)) - consumeAndVerifyRecords(consumer = consumer, numRecords = 1, startingOffset = 0, startingTimestamp = startingTimestamp) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPartitionsFor(groupProtocol: String): Unit = { - val numParts = 2 - createTopic("part-test", numParts) - val consumer = createConsumer() - val parts = consumer.partitionsFor("part-test") - assertNotNull(parts) - assertEquals(2, parts.size) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPartitionsForAutoCreate(groupProtocol: String): Unit = { - val consumer = createConsumer() - // First call would create the topic - consumer.partitionsFor("non-exist-topic") - TestUtils.waitUntilTrue(() => { - !consumer.partitionsFor("non-exist-topic").isEmpty - }, s"Timed out while awaiting non empty partitions.") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPartitionsForInvalidTopic(groupProtocol: String): Unit = { - val consumer = createConsumer() - assertThrows(classOf[InvalidTopicException], () => consumer.partitionsFor(";3# ads,{234")) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testSeek(groupProtocol: String): Unit = { - val consumer = createConsumer() - val totalRecords = 50L - val mid = totalRecords / 2 - - // Test seek non-compressed message - val producer = createProducer() - val startingTimestamp = 0 - sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp) - consumer.assign(java.util.List.of(tp)) - - consumer.seekToEnd(java.util.List.of(tp)) - assertEquals(totalRecords, consumer.position(tp)) - assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty) - - consumer.seekToBeginning(java.util.List.of(tp)) - assertEquals(0L, consumer.position(tp)) - consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, startingTimestamp = startingTimestamp) - - consumer.seek(tp, mid) - assertEquals(mid, consumer.position(tp)) - - consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt, - startingTimestamp = mid) - - // Test seek compressed message - sendCompressedMessages(totalRecords.toInt, tp2) - consumer.assign(java.util.List.of(tp2)) - - consumer.seekToEnd(java.util.List.of(tp2)) - assertEquals(totalRecords, consumer.position(tp2)) - assertTrue(consumer.poll(Duration.ofMillis(50)).isEmpty) - - consumer.seekToBeginning(java.util.List.of(tp2)) - assertEquals(0L, consumer.position(tp2)) - consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = tp2) - - consumer.seek(tp2, mid) - assertEquals(mid, consumer.position(tp2)) - consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = mid.toInt, startingKeyAndValueIndex = mid.toInt, - startingTimestamp = mid, tp = tp2) - } - - private def sendCompressedMessages(numRecords: Int, tp: TopicPartition): Unit = { - val producerProps = new Properties() - producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name) - producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Int.MaxValue.toString) - val producer = createProducer(configOverrides = producerProps) - (0 until numRecords).foreach { i => - producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, s"value $i".getBytes)) - } - producer.close() - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPartitionPauseAndResume(groupProtocol: String): Unit = { - val partitions = java.util.List.of(tp) - val producer = createProducer() - var startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords = 5, tp, startingTimestamp = startingTimestamp) - - val consumer = createConsumer() - consumer.assign(partitions) - consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 0, startingTimestamp = startingTimestamp) - consumer.pause(partitions) - startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords = 5, tp, startingTimestamp = startingTimestamp) - assertTrue(consumer.poll(Duration.ofMillis(100)).isEmpty) - consumer.resume(partitions) - consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 5, startingTimestamp = startingTimestamp) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testInterceptors(groupProtocol: String): Unit = { - val appendStr = "mock" - MockConsumerInterceptor.resetCounters() - MockProducerInterceptor.resetCounters() - - // create producer with interceptor - val producerProps = new Properties() - producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, classOf[MockProducerInterceptor].getName) - producerProps.put("mock.interceptor.append", appendStr) - val testProducer = createProducer(keySerializer = new StringSerializer, - valueSerializer = new StringSerializer, - configOverrides = producerProps) - - // produce records - val numRecords = 10 - (0 until numRecords).map { i => - testProducer.send(new ProducerRecord(tp.topic, tp.partition, s"key $i", s"value $i")) - }.foreach(_.get) - assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue) - assertEquals(numRecords, MockProducerInterceptor.ON_SUCCESS_COUNT.intValue) - // send invalid record - assertThrows(classOf[Throwable], () => testProducer.send(null), () => "Should not allow sending a null record") - assertEquals(1, MockProducerInterceptor.ON_ERROR_COUNT.intValue, "Interceptor should be notified about exception") - assertEquals(0, MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue(), "Interceptor should not receive metadata with an exception when record is null") - - // create consumer with interceptor - this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor") - val testConsumer = createConsumer(keyDeserializer = new StringDeserializer, valueDeserializer = new StringDeserializer) - testConsumer.assign(java.util.List.of(tp)) - testConsumer.seek(tp, 0) - - // consume and verify that values are modified by interceptors - val records = consumeRecords(testConsumer, numRecords) - for (i <- 0 until numRecords) { - val record = records(i) - assertEquals(s"key $i", new String(record.key)) - assertEquals(s"value $i$appendStr".toUpperCase(Locale.ROOT), new String(record.value)) - } - - // commit sync and verify onCommit is called - val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue - testConsumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(2L))) - assertEquals(2, testConsumer.committed(java.util.Set.of(tp)).get(tp).offset) - assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue) - - // commit async and verify onCommit is called - sendAndAwaitAsyncCommit(testConsumer, Some(Map(tp -> new OffsetAndMetadata(5L)))) - assertEquals(5, testConsumer.committed(java.util.Set.of(tp)).get(tp).offset) - assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue) - - testConsumer.close() - testProducer.close() - - // cleanup - MockConsumerInterceptor.resetCounters() - MockProducerInterceptor.resetCounters() - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testInterceptorsWithWrongKeyValue(groupProtocol: String): Unit = { - val appendStr = "mock" - // create producer with interceptor that has different key and value types from the producer - val producerProps = new Properties() - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) - producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor") - producerProps.put("mock.interceptor.append", appendStr) - val testProducer = createProducer() - - // producing records should succeed - testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key".getBytes, s"value will not be modified".getBytes)) - - // create consumer with interceptor that has different key and value types from the consumer - this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor") - val testConsumer = createConsumer() - - testConsumer.assign(java.util.List.of(tp)) - testConsumer.seek(tp, 0) - - // consume and verify that values are not modified by interceptors -- their exceptions are caught and logged, but not propagated - val records = consumeRecords(testConsumer, 1) - val record = records.head - assertEquals(s"value will not be modified", new String(record.value())) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumeMessagesWithCreateTime(groupProtocol: String): Unit = { - val numRecords = 50 - // Test non-compressed messages - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - val consumer = createConsumer() - consumer.assign(java.util.List.of(tp)) - consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) - - // Test compressed messages - sendCompressedMessages(numRecords, tp2) - consumer.assign(java.util.List.of(tp2)) - consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = tp2, startingOffset = 0) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumeMessagesWithLogAppendTime(groupProtocol: String): Unit = { - val topicName = "testConsumeMessagesWithLogAppendTime" - val topicProps = new Properties() - topicProps.setProperty(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime") - createTopic(topicName, 2, 2, topicProps) - - val startTime = System.currentTimeMillis() - val numRecords = 50 - - // Test non-compressed messages - val tp1 = new TopicPartition(topicName, 0) - val producer = createProducer() - sendRecords(producer, numRecords, tp1) - - val consumer = createConsumer() - consumer.assign(java.util.List.of(tp1)) - consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = tp1, startingOffset = 0, - startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME) - - // Test compressed messages - val tp2 = new TopicPartition(topicName, 1) - sendCompressedMessages(numRecords, tp2) - consumer.assign(java.util.List.of(tp2)) - consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, tp = tp2, startingOffset = 0, - startingTimestamp = startTime, timestampType = TimestampType.LOG_APPEND_TIME) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testListTopics(groupProtocol: String): Unit = { - val numParts = 2 - val topic1 = "part-test-topic-1" - val topic2 = "part-test-topic-2" - val topic3 = "part-test-topic-3" - createTopic(topic1, numParts) - createTopic(topic2, numParts) - createTopic(topic3, numParts) - - val consumer = createConsumer() - val topics = consumer.listTopics() - assertNotNull(topics) - assertEquals(5, topics.size()) - assertEquals(5, topics.keySet().size()) - assertEquals(2, topics.get(topic1).size) - assertEquals(2, topics.get(topic2).size) - assertEquals(2, topics.get(topic3).size) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPauseStateNotPreservedByRebalance(groupProtocol: String): Unit = { - if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) { - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") - } - val consumer = createConsumer() - - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords = 5, tp, startingTimestamp = startingTimestamp) - consumer.subscribe(java.util.List.of(topic)) - consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 0, startingTimestamp = startingTimestamp) - consumer.pause(java.util.List.of(tp)) - - // subscribe to a new topic to trigger a rebalance - consumer.subscribe(java.util.List.of("topic2")) - - // after rebalance, our position should be reset and our pause state lost, - // so we should be able to consume from the beginning - consumeAndVerifyRecords(consumer = consumer, numRecords = 0, startingOffset = 5, startingTimestamp = startingTimestamp) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPerPartitionLeadMetricsCleanUpWithSubscribe(groupProtocol: String): Unit = { - val numMessages = 1000 - val topic2 = "topic2" - createTopic(topic2, 2, brokerCount) - // send some messages. - val producer = createProducer() - sendRecords(producer, numMessages, tp) - // Test subscribe - // Create a consumer and consumer some messages. - consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe") - consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithSubscribe") - val consumer = createConsumer() - val listener = new TestConsumerReassignmentListener - consumer.subscribe(java.util.List.of(topic, topic2), listener) - val records = awaitNonEmptyRecords(consumer, tp) - assertEquals(1, listener.callsToAssigned, "should be assigned once") - // Verify the metric exist. - val tags1 = new util.HashMap[String, String]() - tags1.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe") - tags1.put("topic", tp.topic()) - tags1.put("partition", String.valueOf(tp.partition())) - - val tags2 = new util.HashMap[String, String]() - tags2.put("client-id", "testPerPartitionLeadMetricsCleanUpWithSubscribe") - tags2.put("topic", tp2.topic()) - tags2.put("partition", String.valueOf(tp2.partition())) - val fetchLead0 = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1)) - assertNotNull(fetchLead0) - assertEquals(records.count.toDouble, fetchLead0.metricValue(), s"The lead should be ${records.count}") - - // Remove topic from subscription - consumer.subscribe(java.util.List.of(topic2), listener) - awaitRebalance(consumer, listener) - // Verify the metric has gone - assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags1))) - assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags2))) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPerPartitionLagMetricsCleanUpWithSubscribe(groupProtocol: String): Unit = { - val numMessages = 1000 - val topic2 = "topic2" - createTopic(topic2, 2, brokerCount) - // send some messages. - val producer = createProducer() - sendRecords(producer, numMessages, tp) - // Test subscribe - // Create a consumer and consumer some messages. - consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe") - consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithSubscribe") - val consumer = createConsumer() - val listener = new TestConsumerReassignmentListener - consumer.subscribe(java.util.List.of(topic, topic2), listener) - val records = awaitNonEmptyRecords(consumer, tp) - assertEquals(1, listener.callsToAssigned, "should be assigned once") - // Verify the metric exist. - val tags1 = new util.HashMap[String, String]() - tags1.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe") - tags1.put("topic", tp.topic()) - tags1.put("partition", String.valueOf(tp.partition())) - - val tags2 = new util.HashMap[String, String]() - tags2.put("client-id", "testPerPartitionLagMetricsCleanUpWithSubscribe") - tags2.put("topic", tp2.topic()) - tags2.put("partition", String.valueOf(tp2.partition())) - val fetchLag0 = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1)) - assertNotNull(fetchLag0) - val expectedLag = numMessages - records.count - assertEquals(expectedLag, fetchLag0.metricValue.asInstanceOf[Double], epsilon, s"The lag should be $expectedLag") - - // Remove topic from subscription - consumer.subscribe(java.util.List.of(topic2), listener) - awaitRebalance(consumer, listener) - // Verify the metric has gone - assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags1))) - assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags2))) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPerPartitionLeadMetricsCleanUpWithAssign(groupProtocol: String): Unit = { - val numMessages = 1000 - // Test assign - // send some messages. - val producer = createProducer() - sendRecords(producer, numMessages, tp) - sendRecords(producer, numMessages, tp2) - - consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign") - consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLeadMetricsCleanUpWithAssign") - val consumer = createConsumer() - consumer.assign(java.util.List.of(tp)) - val records = awaitNonEmptyRecords(consumer, tp) - // Verify the metric exist. - val tags = new util.HashMap[String, String]() - tags.put("client-id", "testPerPartitionLeadMetricsCleanUpWithAssign") - tags.put("topic", tp.topic()) - tags.put("partition", String.valueOf(tp.partition())) - val fetchLead = consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags)) - assertNotNull(fetchLead) - - assertEquals(records.count.toDouble, fetchLead.metricValue(), s"The lead should be ${records.count}") - - consumer.assign(java.util.List.of(tp2)) - awaitNonEmptyRecords(consumer ,tp2) - assertNull(consumer.metrics.get(new MetricName("records-lead", "consumer-fetch-manager-metrics", "", tags))) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPerPartitionLagMetricsCleanUpWithAssign(groupProtocol: String): Unit = { - val numMessages = 1000 - // Test assign - // send some messages. - val producer = createProducer() - sendRecords(producer, numMessages, tp) - sendRecords(producer, numMessages, tp2) - - consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign") - consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign") - val consumer = createConsumer() - consumer.assign(java.util.List.of(tp)) - val records = awaitNonEmptyRecords(consumer, tp) - // Verify the metric exist. - val tags = new util.HashMap[String, String]() - tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign") - tags.put("topic", tp.topic()) - tags.put("partition", String.valueOf(tp.partition())) - val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)) - assertNotNull(fetchLag) - - val expectedLag = numMessages - records.count - assertEquals(expectedLag, fetchLag.metricValue.asInstanceOf[Double], epsilon, s"The lag should be $expectedLag") - - consumer.assign(java.util.List.of(tp2)) - awaitNonEmptyRecords(consumer, tp2) - assertNull(consumer.metrics.get(new MetricName(tp.toString + ".records-lag", "consumer-fetch-manager-metrics", "", tags))) - assertNull(consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags))) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPerPartitionLagMetricsWhenReadCommitted(groupProtocol: String): Unit = { - val numMessages = 1000 - // send some messages. - val producer = createProducer() - sendRecords(producer, numMessages, tp) - sendRecords(producer, numMessages, tp2) - - consumerConfig.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") - consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign") - consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testPerPartitionLagMetricsCleanUpWithAssign") - val consumer = createConsumer() - consumer.assign(java.util.List.of(tp)) - awaitNonEmptyRecords(consumer, tp) - // Verify the metric exist. - val tags = new util.HashMap[String, String]() - tags.put("client-id", "testPerPartitionLagMetricsCleanUpWithAssign") - tags.put("topic", tp.topic()) - tags.put("partition", String.valueOf(tp.partition())) - val fetchLag = consumer.metrics.get(new MetricName("records-lag", "consumer-fetch-manager-metrics", "", tags)) - assertNotNull(fetchLag) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testQuotaMetricsNotCreatedIfNoQuotasConfigured(groupProtocol: String): Unit = { - val numRecords = 1000 - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - sendRecords(producer, numRecords, tp, startingTimestamp = startingTimestamp) - - val consumer = createConsumer() - consumer.assign(java.util.List.of(tp)) - consumer.seek(tp, 0) - consumeAndVerifyRecords(consumer = consumer, numRecords = numRecords, startingOffset = 0, startingTimestamp = startingTimestamp) - - def assertNoMetric(broker: KafkaBroker, name: String, quotaType: QuotaType, clientId: String): Unit = { - val metricName = broker.metrics.metricName("throttle-time", - quotaType.toString, - "", - "user", "", - "client-id", clientId) - assertNull(broker.metrics.metric(metricName), "Metric should not have been created " + metricName) - } - brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.PRODUCE, producerClientId)) - brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.PRODUCE, producerClientId)) - brokers.foreach(assertNoMetric(_, "byte-rate", QuotaType.FETCH, consumerClientId)) - brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.FETCH, consumerClientId)) - - brokers.foreach(assertNoMetric(_, "request-time", QuotaType.REQUEST, producerClientId)) - brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.REQUEST, producerClientId)) - brokers.foreach(assertNoMetric(_, "request-time", QuotaType.REQUEST, consumerClientId)) - brokers.foreach(assertNoMetric(_, "throttle-time", QuotaType.REQUEST, consumerClientId)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumingWithNullGroupId(groupProtocol: String): Unit = { - val topic = "test_topic" - val partition = 0 - val tp = new TopicPartition(topic, partition) - createTopic(topic) - - val producer = createProducer() - producer.send(new ProducerRecord(topic, partition, "k1".getBytes, "v1".getBytes)).get() - producer.send(new ProducerRecord(topic, partition, "k2".getBytes, "v2".getBytes)).get() - producer.send(new ProducerRecord(topic, partition, "k3".getBytes, "v3".getBytes)).get() - producer.close() - - // consumer 1 uses the default group id and consumes from earliest offset - val consumer1Config = new Properties(consumerConfig) - consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1") - val consumer1 = createConsumer( - configOverrides = consumer1Config, - configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - - // consumer 2 uses the default group id and consumes from latest offset - val consumer2Config = new Properties(consumerConfig) - consumer2Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") - consumer2Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer2") - val consumer2 = createConsumer( - configOverrides = consumer2Config, - configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - - // consumer 3 uses the default group id and starts from an explicit offset - val consumer3Config = new Properties(consumerConfig) - consumer3Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer3") - val consumer3 = createConsumer( - configOverrides = consumer3Config, - configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - - consumer1.assign(util.List.of(tp)) - consumer2.assign(util.List.of(tp)) - consumer3.assign(util.List.of(tp)) - consumer3.seek(tp, 1) - - val numRecords1 = consumer1.poll(Duration.ofMillis(5000)).count() - assertThrows(classOf[InvalidGroupIdException], () => consumer1.commitSync()) - assertThrows(classOf[InvalidGroupIdException], () => consumer2.committed(java.util.Set.of(tp))) - - val numRecords2 = consumer2.poll(Duration.ofMillis(5000)).count() - val numRecords3 = consumer3.poll(Duration.ofMillis(5000)).count() - - consumer1.unsubscribe() - consumer2.unsubscribe() - consumer3.unsubscribe() - - assertTrue(consumer1.assignment().isEmpty) - assertTrue(consumer2.assignment().isEmpty) - assertTrue(consumer3.assignment().isEmpty) - - consumer1.close() - consumer2.close() - consumer3.close() - - assertEquals(3, numRecords1, "Expected consumer1 to consume from earliest offset") - assertEquals(0, numRecords2, "Expected consumer2 to consume from latest offset") - assertEquals(2, numRecords3, "Expected consumer3 to consume from offset 1") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testNullGroupIdNotSupportedIfCommitting(groupProtocol: String): Unit = { - val consumer1Config = new Properties(consumerConfig) - consumer1Config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - consumer1Config.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer1") - val consumer1 = createConsumer( - configOverrides = consumer1Config, - configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) - - consumer1.assign(java.util.List.of(tp)) - assertThrows(classOf[InvalidGroupIdException], () => consumer1.commitSync()) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(groupProtocol: String): Unit = { - val foo = "foo" - val foo0 = new TopicPartition(foo, 0) - val foo1 = new TopicPartition(foo, 1) - - val admin = createAdminClient() - admin.createTopics(java.util.List.of(new NewTopic(foo, 1, 1.toShort))).all.get - - val consumerConfig = new Properties - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id") - consumerConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "my-instance-id") - - val consumer1 = createConsumer(configOverrides = consumerConfig) - consumer1.subscribe(java.util.List.of(foo)) - awaitAssignment(consumer1, Set(foo0)) - consumer1.close() - - val consumer2 = createConsumer(configOverrides = consumerConfig) - consumer2.subscribe(java.util.List.of(foo)) - awaitAssignment(consumer2, Set(foo0)) - - admin.createPartitions(java.util.Map.of(foo, NewPartitions.increaseTo(2))).all.get - - awaitAssignment(consumer2, Set(foo0, foo1)) - - consumer2.close() - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testEndOffsets(groupProtocol: String): Unit = { - val producer = createProducer() - val startingTimestamp = System.currentTimeMillis() - val numRecords = 10000 - (0 until numRecords).map { i => - val timestamp = startingTimestamp + i.toLong - val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, s"key $i".getBytes, s"value $i".getBytes) - producer.send(record) - record - } - producer.flush() - - val consumer = createConsumer() - consumer.subscribe(java.util.List.of(topic)) - awaitAssignment(consumer, Set(tp, tp2)) - - val endOffsets = consumer.endOffsets(java.util.Set.of(tp)) - assertEquals(numRecords, endOffsets.get(tp)) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testSeekThrowsIllegalStateIfPartitionsNotAssigned(groupProtocol: String): Unit = { - val tp = new TopicPartition(topic, 0) - val consumer = createConsumer(configOverrides = consumerConfig) - val e: Exception = assertThrows(classOf[IllegalStateException], () => consumer.seekToEnd(util.List.of(tp))) - assertEquals("No current assignment for partition " + tp, e.getMessage) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testFetchOffsetsForTime(groupProtocol: String): Unit = { - val numPartitions = 2 - val producer = createProducer() - val timestampsToSearch = new util.HashMap[TopicPartition, java.lang.Long]() - var i = 0 - for (part <- 0 until numPartitions) { - val tp = new TopicPartition(topic, part) - // key, val, and timestamp equal to the sequence number. - sendRecords(producer, numRecords = 100, tp, startingTimestamp = 0) - timestampsToSearch.put(tp, (i * 20).toLong) - i += 1 - } - - val consumer = createConsumer() - // Test negative target time - assertThrows(classOf[IllegalArgumentException], - () => consumer.offsetsForTimes(util.Map.of(new TopicPartition(topic, 0), -1))) - val timestampOffsets = consumer.offsetsForTimes(timestampsToSearch) - - val timestampTp0 = timestampOffsets.get(new TopicPartition(topic, 0)) - assertEquals(0, timestampTp0.offset) - assertEquals(0, timestampTp0.timestamp) - assertEquals(Optional.of(0), timestampTp0.leaderEpoch) - - val timestampTp1 = timestampOffsets.get(new TopicPartition(topic, 1)) - assertEquals(20, timestampTp1.offset) - assertEquals(20, timestampTp1.timestamp) - assertEquals(Optional.of(0), timestampTp1.leaderEpoch) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - @Timeout(15) - def testPositionRespectsTimeout(groupProtocol: String): Unit = { - val topicPartition = new TopicPartition(topic, 15) - val consumer = createConsumer() - consumer.assign(java.util.List.of(topicPartition)) - - // When position() is called for a topic/partition that doesn't exist, the consumer will repeatedly update the - // local metadata. However, it should give up after the user-supplied timeout has past. - assertThrows(classOf[TimeoutException], () => consumer.position(topicPartition, Duration.ofSeconds(3))) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - @Timeout(15) - def testPositionRespectsWakeup(groupProtocol: String): Unit = { - val topicPartition = new TopicPartition(topic, 15) - val consumer = createConsumer() - consumer.assign(java.util.List.of(topicPartition)) - - CompletableFuture.runAsync { () => - TimeUnit.SECONDS.sleep(1) - consumer.wakeup() - } - - assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(3))) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - @Timeout(15) - def testPositionWithErrorConnectionRespectsWakeup(groupProtocol: String): Unit = { - val topicPartition = new TopicPartition(topic, 15) - val properties = new Properties() - properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345") // make sure the connection fails - val consumer = createConsumer(configOverrides = properties) - consumer.assign(java.util.List.of(topicPartition)) - - CompletableFuture.runAsync { () => - TimeUnit.SECONDS.sleep(1) - consumer.wakeup() - } - - assertThrows(classOf[WakeupException], () => consumer.position(topicPartition, Duration.ofSeconds(100))) - } - @Flaky("KAFKA-18031") @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("getTestGroupProtocolParametersAll")) @@ -871,22 +72,4 @@ class PlaintextConsumerTest extends BaseConsumerTest { waitTimeMs=leaveGroupTimeoutMs ) } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testOffsetRelatedWhenTimeoutZero(groupProtocol: String): Unit = { - val consumer = createConsumer() - val result1 = consumer.beginningOffsets(util.List.of(tp), Duration.ZERO) - assertNotNull(result1) - assertEquals(0, result1.size()) - - val result2 = consumer.endOffsets(util.List.of(tp), Duration.ZERO) - assertNotNull(result2) - assertEquals(0, result2.size()) - - val result3 = consumer.offsetsForTimes(java.util.Map.of(tp, 0), Duration.ZERO) - assertNotNull(result3) - assertEquals(1, result3.size()) - assertNull(result3.get(tp)) - } }