From ae608c1cb298a236fcb2773f17abb164958dbf70 Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Wed, 16 Apr 2025 11:57:14 +0800 Subject: [PATCH] KAFKA-19042 Move PlaintextConsumerCallbackTest to client-integration-tests module (#19298) Use Java to rewrite `PlaintextConsumerCallbackTest` by new test infra and move it to client-integration-tests module. Reviewers: TengYao Chi , Chia-Ping Tsai --- .../PlaintextConsumerCallbackTest.java | 352 ++++++++++++++++++ .../api/PlaintextConsumerCallbackTest.scala | 175 --------- 2 files changed, 352 insertions(+), 175 deletions(-) create mode 100644 clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java delete mode 100644 core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java new file mode 100644 index 00000000000..800ca3b2c78 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = 3 +) +public class PlaintextConsumerCallbackTest { + + private final ClusterInstance cluster; + private final String topic = "topic"; + private final TopicPartition tp = new TopicPartition(topic, 0); + + public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) { + this.cluster = clusterInstance; + } + + @ClusterTest + public void testClassicConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { + testRebalanceListenerAssignOnPartitionsAssigned(CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { + testRebalanceListenerAssignOnPartitionsAssigned(CONSUMER); + } + + private void testRebalanceListenerAssignOnPartitionsAssigned(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + } + + @ClusterTest + public void testClassicConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException { + testRebalanceListenerAssignmentOnPartitionsAssigned(CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException { + testRebalanceListenerAssignmentOnPartitionsAssigned(CONSUMER); + } + + private void testRebalanceListenerAssignmentOnPartitionsAssigned(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertTrue(executeConsumer.assignment().contains(tp)) + ); + } + } + + @ClusterTest + public void testClassicConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws InterruptedException { + testRebalanceListenerBeginningOffsetsOnPartitionsAssigned(CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned() throws InterruptedException { + testRebalanceListenerBeginningOffsetsOnPartitionsAssigned(CONSUMER); + } + + private void testRebalanceListenerBeginningOffsetsOnPartitionsAssigned(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol)) { + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + var map = executeConsumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + } + + @ClusterTest + public void testClassicConsumerRebalanceListenerAssignOnPartitionsRevoked() throws InterruptedException { + testRebalanceListenerAssignOnPartitionsRevoked(CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignOnPartitionsRevoked() throws InterruptedException { + testRebalanceListenerAssignOnPartitionsRevoked(CONSUMER); + } + + private void testRebalanceListenerAssignOnPartitionsRevoked(GroupProtocol groupProtocol) throws InterruptedException { + triggerOnPartitionsRevoked(tp, groupProtocol, (consumer, partitions) -> { + var e = assertThrows(IllegalStateException.class, () -> consumer.assign(List.of(tp))); + assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); + }); + } + + @ClusterTest + public void testClassicConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CLASSIC, + (consumer, partitions) -> assertTrue(consumer.assignment().contains(tp)) + ); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerAssignmentOnPartitionsRevoked() throws InterruptedException { + triggerOnPartitionsRevoked(tp, CONSUMER, + (consumer, partitions) -> assertTrue(consumer.assignment().contains(tp)) + ); + } + + @ClusterTest + public void testClassicConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws InterruptedException { + testRebalanceListenerBeginningOffsetsOnPartitionsRevoked(CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked() throws InterruptedException { + testRebalanceListenerBeginningOffsetsOnPartitionsRevoked(CONSUMER); + } + + private void testRebalanceListenerBeginningOffsetsOnPartitionsRevoked(GroupProtocol groupProtocol) throws InterruptedException { + triggerOnPartitionsRevoked(tp, groupProtocol, (consumer, partitions) -> { + var map = consumer.beginningOffsets(List.of(tp)); + assertTrue(map.containsKey(tp)); + assertEquals(0L, map.get(tp)); + }); + } + + @ClusterTest + public void testClassicConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CONSUMER); + } + + private void testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol)) { + triggerOnPartitionsAssigned(tp, consumer, + (executeConsumer, partitions) -> assertDoesNotThrow(() -> executeConsumer.position(tp)) + ); + } + } + + @ClusterTest + public void testClassicConsumerSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException { + testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(CONSUMER); + } + + private void testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(GroupProtocol groupProtocol) throws InterruptedException { + try (var consumer = createConsumer(groupProtocol)) { + var startingOffset = 100L; + var totalRecords = 120; + var startingTimestamp = 0L; + + sendRecords(totalRecords, startingTimestamp); + + triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { + executeConsumer.seek(tp, startingOffset); + executeConsumer.pause(List.of(tp)); + }); + + assertTrue(consumer.paused().contains(tp)); + consumer.resume(List.of(tp)); + consumeAndVerifyRecords( + consumer, + (int) (totalRecords - startingOffset), + (int) startingOffset, + (int) startingOffset, + startingOffset + ); + } + } + + private void triggerOnPartitionsAssigned( + TopicPartition tp, + Consumer consumer, + BiConsumer, Collection> execute + ) throws InterruptedException { + var partitionsAssigned = new AtomicBoolean(false); + consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + // Make sure the partition used in the test is actually assigned before continuing. + if (partitions.contains(tp)) { + execute.accept(consumer, partitions); + partitionsAssigned.set(true); + } + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + // noop + } + }); + TestUtils.waitForCondition( + () -> { + consumer.poll(Duration.ofMillis(100)); + return partitionsAssigned.get(); + }, + "Timed out before expected rebalance completed" + ); + } + + private void triggerOnPartitionsRevoked( + TopicPartition tp, + GroupProtocol protocol, + BiConsumer, Collection> execute + ) throws InterruptedException { + var partitionsAssigned = new AtomicBoolean(false); + var partitionsRevoked = new AtomicBoolean(false); + try (var consumer = createConsumer(protocol)) { + consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() { + @Override + public void onPartitionsAssigned(Collection partitions) { + // Make sure the partition used in the test is actually assigned before continuing. + if (partitions.contains(tp)) { + partitionsAssigned.set(true); + } + } + + @Override + public void onPartitionsRevoked(Collection partitions) { + // Make sure the partition used in the test is actually revoked before continuing. + if (partitions.contains(tp)) { + execute.accept(consumer, partitions); + partitionsRevoked.set(true); + } + } + }); + TestUtils.waitForCondition( + () -> { + consumer.poll(Duration.ofMillis(100)); + return partitionsAssigned.get(); + }, + "Timed out before expected rebalance completed" + ); + } + assertTrue(partitionsRevoked.get()); + } + + private Consumer createConsumer(GroupProtocol protocol) { + return cluster.consumer(Map.of( + GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT), + ENABLE_AUTO_COMMIT_CONFIG, "false" + )); + } + + private void sendRecords(int numRecords, long startingTimestamp) { + try (Producer producer = cluster.producer()) { + for (var i = 0; i < numRecords; i++) { + var timestamp = startingTimestamp + i; + var record = new ProducerRecord<>( + tp.topic(), + tp.partition(), + timestamp, + ("key " + i).getBytes(), + ("value " + i).getBytes() + ); + producer.send(record); + } + producer.flush(); + } + } + + protected void consumeAndVerifyRecords( + Consumer consumer, + int numRecords, + int startingOffset, + int startingKeyAndValueIndex, + long startingTimestamp + ) throws InterruptedException { + var records = consumeRecords(consumer, numRecords); + for (var i = 0; i < numRecords; i++) { + var record = records.get(i); + var offset = startingOffset + i; + + assertEquals(tp.topic(), record.topic()); + assertEquals(tp.partition(), record.partition()); + + assertEquals(TimestampType.CREATE_TIME, record.timestampType()); + var timestamp = startingTimestamp + i; + assertEquals(timestamp, record.timestamp()); + + assertEquals(offset, record.offset()); + var keyAndValueIndex = startingKeyAndValueIndex + i; + assertEquals("key " + keyAndValueIndex, new String(record.key())); + assertEquals("value " + keyAndValueIndex, new String(record.value())); + // this is true only because K and V are byte arrays + assertEquals(("key " + keyAndValueIndex).length(), record.serializedKeySize()); + assertEquals(("value " + keyAndValueIndex).length(), record.serializedValueSize()); + } + } + + protected List> consumeRecords( + Consumer consumer, + int numRecords + ) throws InterruptedException { + List> records = new ArrayList<>(); + TestUtils.waitForCondition(() -> { + consumer.poll(Duration.ofMillis(100)).forEach(records::add); + return records.size() >= numRecords; + }, 60000, "Timed out before consuming expected " + numRecords + " records."); + + return records; + } +} diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala deleted file mode 100644 index e9c0ba0cc22..00000000000 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerCallbackTest.scala +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package integration.kafka.api - -import kafka.api.AbstractConsumerTest -import kafka.utils.{TestInfoUtils, TestUtils} -import org.apache.kafka.clients.consumer.{Consumer, ConsumerRebalanceListener} -import org.apache.kafka.common.TopicPartition -import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertThrows, assertTrue} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource - -import java.util -import java.util.Arrays.asList -import java.util.Collections -import java.util.concurrent.atomic.AtomicBoolean - -/** - * Integration tests for the consumer that cover interaction with the consumer from within callbacks - * and listeners. - */ -class PlaintextConsumerCallbackTest extends AbstractConsumerTest { - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumerRebalanceListenerAssignOnPartitionsAssigned(groupProtocol: String): Unit = { - val tp = new TopicPartition(topic, 0) - triggerOnPartitionsAssigned(tp, { (consumer, _) => - val e: Exception = assertThrows(classOf[IllegalStateException], () => consumer.assign(Collections.singletonList(tp))) - assertEquals(e.getMessage, "Subscription to topics, partitions and pattern are mutually exclusive") - }) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumerRebalanceListenerAssignmentOnPartitionsAssigned(groupProtocol: String): Unit = { - val tp = new TopicPartition(topic, 0) - triggerOnPartitionsAssigned(tp, { (consumer, _) => - assertTrue(consumer.assignment().contains(tp)); - }) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumerRebalanceListenerBeginningOffsetsOnPartitionsAssigned(groupProtocol: String): Unit = { - val tp = new TopicPartition(topic, 0) - triggerOnPartitionsAssigned(tp, { (consumer, _) => - val map = consumer.beginningOffsets(Collections.singletonList(tp)) - assertTrue(map.containsKey(tp)) - assertEquals(0, map.get(tp)) - }) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumerRebalanceListenerAssignOnPartitionsRevoked(groupProtocol: String): Unit = { - val tp = new TopicPartition(topic, 0) - triggerOnPartitionsRevoked(tp, { (consumer, _) => - val e: Exception = assertThrows(classOf[IllegalStateException], () => consumer.assign(Collections.singletonList(tp))) - assertEquals(e.getMessage, "Subscription to topics, partitions and pattern are mutually exclusive") - }) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumerRebalanceListenerAssignmentOnPartitionsRevoked(groupProtocol: String): Unit = { - val tp = new TopicPartition(topic, 0) - triggerOnPartitionsRevoked(tp, { (consumer, _) => - assertTrue(consumer.assignment().contains(tp)) - }) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testConsumerRebalanceListenerBeginningOffsetsOnPartitionsRevoked(groupProtocol: String): Unit = { - val tp = new TopicPartition(topic, 0) - triggerOnPartitionsRevoked(tp, { (consumer, _) => - val map = consumer.beginningOffsets(Collections.singletonList(tp)) - assertTrue(map.containsKey(tp)) - assertEquals(0, map.get(tp)) - }) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(groupProtocol: String): Unit = { - val tp = new TopicPartition(topic, 0) - triggerOnPartitionsAssigned(tp, { (consumer, _) => assertDoesNotThrow(() => consumer.position(tp)) }) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(groupProtocol: String): Unit = { - val consumer = createConsumer() - val startingOffset = 100L - val totalRecords = 120L - - val producer = createProducer() - val startingTimestamp = 0 - sendRecords(producer, totalRecords.toInt, tp, startingTimestamp) - - triggerOnPartitionsAssigned(tp, consumer, { (consumer, _) => - consumer.seek(tp, startingOffset) - consumer.pause(asList(tp)) - }) - - assertTrue(consumer.paused().contains(tp)) - consumer.resume(asList(tp)) - consumeAndVerifyRecords(consumer, numRecords = (totalRecords - startingOffset).toInt, - startingOffset = startingOffset.toInt, startingKeyAndValueIndex = startingOffset.toInt, - startingTimestamp = startingOffset) - } - - private def triggerOnPartitionsAssigned(tp: TopicPartition, - execute: (Consumer[Array[Byte], Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = { - val consumer = createConsumer() - triggerOnPartitionsAssigned(tp, consumer, execute) - } - - private def triggerOnPartitionsAssigned(tp: TopicPartition, - consumer: Consumer[Array[Byte], Array[Byte]], - execute: (Consumer[Array[Byte], Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = { - val partitionsAssigned = new AtomicBoolean(false) - consumer.subscribe(asList(topic), new ConsumerRebalanceListener { - override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - // Make sure the partition used in the test is actually assigned before continuing. - if (partitions.contains(tp)) { - execute(consumer, partitions) - partitionsAssigned.set(true) - } - } - - override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { - // noop - } - }) - TestUtils.pollUntilTrue(consumer, () => partitionsAssigned.get(), "Timed out before expected rebalance completed") - } - - private def triggerOnPartitionsRevoked(tp: TopicPartition, - execute: (Consumer[Array[Byte], Array[Byte]], util.Collection[TopicPartition]) => Unit): Unit = { - val consumer = createConsumer() - val partitionsAssigned = new AtomicBoolean(false) - val partitionsRevoked = new AtomicBoolean(false) - consumer.subscribe(asList(topic), new ConsumerRebalanceListener { - override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { - // Make sure the partition used in the test is actually assigned before continuing. - if (partitions.contains(tp)) { - partitionsAssigned.set(true) - } - } - - override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { - // Make sure the partition used in the test is actually revoked before continuing. - if (partitions.contains(tp)) { - execute(consumer, partitions) - partitionsRevoked.set(true) - } - } - }) - TestUtils.pollUntilTrue(consumer, () => partitionsAssigned.get(), "Timed out before expected rebalance completed") - consumer.close() - assertTrue(partitionsRevoked.get()) - } -}