diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java index 651c3033f29..0cf81f9538a 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java @@ -300,6 +300,21 @@ public class ClientsTestUtils { assertEquals(initialRevokeCalls, rebalanceListener.callsToRevoked); } + + public static void waitForPollThrowException( + Consumer consumer, + Class exceptedException + ) throws InterruptedException { + TestUtils.waitForCondition(() -> { + try { + consumer.poll(Duration.ZERO); + return false; + } catch (Exception e) { + return exceptedException.isInstance(e); + } + }, "Continuous poll not fail"); + } + /** * This class is intended to replace the test cases in BaseConsumerTest.scala. * When converting tests that extend from BaseConsumerTest.scala to Java, diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java index 791d8eee43e..bd90e54db45 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerPollTest.java @@ -48,6 +48,7 @@ import static org.apache.kafka.clients.ClientsTestUtils.awaitRebalance; import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; import static org.apache.kafka.clients.ClientsTestUtils.ensureNoRebalance; import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.ClientsTestUtils.waitForPollThrowException; import static org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.SESSION_TIMEOUT_MS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG; @@ -503,14 +504,7 @@ public class PlaintextConsumerPollTest { // continuous poll should eventually fail because there is no offset reset strategy set // (fail only when resetting positions after coordinator is known) - TestUtils.waitForCondition(() -> { - try { - consumer.poll(Duration.ZERO); - return false; - } catch (NoOffsetForPartitionException e) { - return true; - } - }, "Continuous poll not fail"); + waitForPollThrowException(consumer, NoOffsetForPartitionException.class); } } diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java new file mode 100644 index 00000000000..7e014537cd9 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.ClientsTestUtils.TestConsumerReassignmentListener; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidRegularExpression; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import org.junit.jupiter.api.BeforeEach; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.kafka.clients.ClientsTestUtils.awaitAssignment; +import static org.apache.kafka.clients.ClientsTestUtils.awaitRebalance; +import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords; +import static org.apache.kafka.clients.ClientsTestUtils.sendRecords; +import static org.apache.kafka.clients.ClientsTestUtils.waitForPollThrowException; +import static org.apache.kafka.clients.CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +@ClusterTestDefaults( + types = {Type.KRAFT}, + brokers = PlaintextConsumerSubscriptionTest.BROKER_COUNT, + serverProperties = { + @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), + @ClusterConfigProperty(key = GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, value = "60000"), + @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), + } +) +public class PlaintextConsumerSubscriptionTest { + + public static final int BROKER_COUNT = 3; + private final ClusterInstance cluster; + private final String topic = "topic"; + private final TopicPartition tp = new TopicPartition(topic, 0); + + public PlaintextConsumerSubscriptionTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + @BeforeEach + public void setup() throws InterruptedException { + cluster.createTopic(topic, 2, (short) BROKER_COUNT); + } + + @ClusterTest + public void testClassicConsumerPatternSubscription() throws InterruptedException { + testPatternSubscription(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerPatternSubscription() throws InterruptedException { + testPatternSubscription(GroupProtocol.CONSUMER); + } + + /** + * Verifies that pattern subscription performs as expected. + * The pattern matches the topics 'topic' and 'tblablac', but not 'tblablak' or 'tblab1'. + * It is expected that the consumer is subscribed to all partitions of 'topic' and 'tblablac' after the subscription + * when metadata is refreshed. + * When a new topic 'tsomec' is added afterward, it is expected that upon the next metadata refresh the consumer + * becomes subscribed to this new topic and all partitions of that topic are assigned to it. + */ + public void testPatternSubscription(GroupProtocol groupProtocol) throws InterruptedException { + var numRecords = 10000; + Map config = Map.of( + MAX_POLL_INTERVAL_MS_CONFIG, 6000, + GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT), + ENABLE_AUTO_COMMIT_CONFIG, false, + METADATA_MAX_AGE_CONFIG, 100 + ); + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(config) + ) { + sendRecords(producer, tp, numRecords, System.currentTimeMillis()); + + var topic1 = "tblablac"; // matches subscribed pattern + cluster.createTopic(topic1, 2, (short) BROKER_COUNT); + sendRecords(producer, new TopicPartition(topic1, 0), 1000, System.currentTimeMillis()); + sendRecords(producer, new TopicPartition(topic1, 1), 1000, System.currentTimeMillis()); + + var topic2 = "tblablak"; // does not match subscribed pattern + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + sendRecords(producer, new TopicPartition(topic2, 0), 1000, System.currentTimeMillis()); + sendRecords(producer, new TopicPartition(topic2, 1), 1000, System.currentTimeMillis()); + + var topic3 = "tblab1"; // does not match subscribed pattern + cluster.createTopic(topic3, 2, (short) BROKER_COUNT); + sendRecords(producer, new TopicPartition(topic3, 0), 1000, System.currentTimeMillis()); + sendRecords(producer, new TopicPartition(topic3, 1), 1000, System.currentTimeMillis()); + + assertEquals(0, consumer.assignment().size()); + var pattern = Pattern.compile("t.*c"); + consumer.subscribe(pattern, new TestConsumerReassignmentListener()); + + Set assignment = new HashSet<>(); + assignment.add(new TopicPartition(topic, 0)); + assignment.add(new TopicPartition(topic, 1)); + assignment.add(new TopicPartition(topic1, 0)); + assignment.add(new TopicPartition(topic1, 1)); + + awaitAssignment(consumer, assignment); + + var topic4 = "tsomec"; // matches subscribed pattern + cluster.createTopic(topic4, 2, (short) BROKER_COUNT); + sendRecords(producer, new TopicPartition(topic4, 0), 1000, System.currentTimeMillis()); + sendRecords(producer, new TopicPartition(topic4, 1), 1000, System.currentTimeMillis()); + + assignment.add(new TopicPartition(topic4, 0)); + assignment.add(new TopicPartition(topic4, 1)); + + awaitAssignment(consumer, assignment); + + consumer.unsubscribe(); + assertEquals(0, consumer.assignment().size()); + } + } + + @ClusterTest + public void testClassicConsumerSubsequentPatternSubscription() throws InterruptedException { + testSubsequentPatternSubscription(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerSubsequentPatternSubscription() throws InterruptedException { + testSubsequentPatternSubscription(GroupProtocol.CONSUMER); + } + + /** + * Verifies that a second call to pattern subscription succeeds and performs as expected. + * The initial subscription is to a pattern that matches two topics 'topic' and 'foo'. + * The second subscription is to a pattern that matches 'foo' and a new topic 'bar'. + * It is expected that the consumer is subscribed to all partitions of 'topic' and 'foo' after + * the first subscription, and to all partitions of 'foo' and 'bar' after the second. + * The metadata refresh interval is intentionally increased to a large enough value to guarantee + * that it is the subscription call that triggers a metadata refresh, and not the timeout. + */ + public void testSubsequentPatternSubscription(GroupProtocol groupProtocol) throws InterruptedException { + var numRecords = 10000; + Map config = Map.of( + MAX_POLL_INTERVAL_MS_CONFIG, 6000, + GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT), + ENABLE_AUTO_COMMIT_CONFIG, false, + METADATA_MAX_AGE_CONFIG, 30000 + ); + try (Consumer consumer = cluster.consumer(config); + Producer producer = cluster.producer() + ) { + sendRecords(producer, tp, numRecords, System.currentTimeMillis()); + + // the first topic ('topic') matches first subscription pattern only + var fooTopic = "foo"; // matches both subscription patterns + cluster.createTopic(fooTopic, 1, (short) BROKER_COUNT); + sendRecords(producer, new TopicPartition(fooTopic, 0), 1000, System.currentTimeMillis()); + + assertEquals(0, consumer.assignment().size()); + + var pattern = Pattern.compile(".*o.*"); // only 'topic' and 'foo' match this + consumer.subscribe(pattern, new TestConsumerReassignmentListener()); + + Set assignment = new HashSet<>(); + assignment.add(new TopicPartition(topic, 0)); + assignment.add(new TopicPartition(topic, 1)); + assignment.add(new TopicPartition(fooTopic, 0)); + + awaitAssignment(consumer, assignment); + + var barTopic = "bar"; // matches the next subscription pattern + cluster.createTopic(barTopic, 1, (short) BROKER_COUNT); + sendRecords(producer, new TopicPartition(barTopic, 0), 1000, System.currentTimeMillis()); + + var pattern2 = Pattern.compile("..."); // only 'foo' and 'bar' match this + consumer.subscribe(pattern2, new TestConsumerReassignmentListener()); + + // Remove topic partitions from assignment + assignment.remove(new TopicPartition(topic, 0)); + assignment.remove(new TopicPartition(topic, 1)); + + // Add bar topic partition to assignment + assignment.add(new TopicPartition(barTopic, 0)); + + awaitAssignment(consumer, assignment); + + consumer.unsubscribe(); + assertEquals(0, consumer.assignment().size()); + } + } + + @ClusterTest + public void testClassicConsumerPatternUnsubscription() throws InterruptedException { + testPatternUnsubscription(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerPatternUnsubscription() throws InterruptedException { + testPatternUnsubscription(GroupProtocol.CONSUMER); + } + + /** + * Verifies that pattern unsubscription performs as expected. + * The pattern matches the topics 'topic' and 'tblablac'. + * It is expected that the consumer is subscribed to all partitions of 'topic' and 'tblablac' after the subscription + * when metadata is refreshed. + * When consumer unsubscribes from all its subscriptions, it is expected that its assignments are cleared right away. + */ + public void testPatternUnsubscription(GroupProtocol groupProtocol) throws InterruptedException { + var numRecords = 10000; + Map config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT)); + try (Producer producer = cluster.producer(); + Consumer consumer = cluster.consumer(config) + ) { + sendRecords(producer, tp, numRecords, System.currentTimeMillis()); + + var topic1 = "tblablac"; // matches the subscription pattern + cluster.createTopic(topic1, 2, (short) BROKER_COUNT); + sendRecords(producer, new TopicPartition(topic1, 0), 1000, System.currentTimeMillis()); + sendRecords(producer, new TopicPartition(topic1, 1), 1000, System.currentTimeMillis()); + + assertEquals(0, consumer.assignment().size()); + + consumer.subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener()); + + Set assignment = Set.of( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1) + ); + awaitAssignment(consumer, assignment); + + consumer.unsubscribe(); + assertEquals(0, consumer.assignment().size()); + } + } + + @ClusterTest + public void testAsyncConsumerRe2JPatternSubscription() throws InterruptedException { + Map config = Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); + try (Consumer consumer = cluster.consumer(config)) { + var topic1 = "tblablac"; // matches subscribed pattern + cluster.createTopic(topic1, 2, (short) BROKER_COUNT); + + var topic2 = "tblablak"; // does not match subscribed pattern + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + var topic3 = "tblab1"; // does not match subscribed pattern + cluster.createTopic(topic3, 2, (short) BROKER_COUNT); + + assertEquals(0, consumer.assignment().size()); + var pattern = new SubscriptionPattern("t.*c"); + consumer.subscribe(pattern); + + Set assignment = Set.of( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1) + ); + awaitAssignment(consumer, assignment); + + consumer.unsubscribe(); + assertEquals(0, consumer.assignment().size()); + // Subscribe to a different pattern to match topic2 (that did not match before) + pattern = new SubscriptionPattern(topic2 + ".*"); + consumer.subscribe(pattern); + + assignment = Set.of( + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1) + ); + awaitAssignment(consumer, assignment); + } + } + + @ClusterTest + public void testAsyncConsumerRe2JPatternSubscriptionFetch() throws InterruptedException { + Map config = Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); + try (Consumer consumer = cluster.consumer(config)) { + var topic1 = "topic1"; // matches subscribed pattern + cluster.createTopic(topic1, 2, (short) BROKER_COUNT); + assertEquals(0, consumer.assignment().size()); + + var pattern = new SubscriptionPattern("topic.*"); + consumer.subscribe(pattern); + + Set assignment = Set.of( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1) + ); + awaitAssignment(consumer, assignment); + + var totalRecords = 10; + var startingTimestamp = System.currentTimeMillis(); + var tp = new TopicPartition(topic1, 0); + sendRecords(cluster, tp, totalRecords, startingTimestamp); + consumeAndVerifyRecords(consumer, tp, totalRecords, 0, 0, startingTimestamp); + } + } + + @ClusterTest + public void testAsyncConsumerRe2JPatternExpandSubscription() throws InterruptedException { + Map config = Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); + try (Consumer consumer = cluster.consumer(config)) { + var topic1 = "topic1"; // matches first pattern + cluster.createTopic(topic1, 2, (short) BROKER_COUNT); + + var topic2 = "topic2"; // does not match first pattern + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + assertEquals(0, consumer.assignment().size()); + var pattern = new SubscriptionPattern("topic1.*"); + consumer.subscribe(pattern); + + Set assignment = Set.of( + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1) + ); + awaitAssignment(consumer, assignment); + + consumer.unsubscribe(); + assertEquals(0, consumer.assignment().size()); + + // Subscribe to a different pattern that should match + // the same topics the member already had plus new ones + pattern = new SubscriptionPattern("topic1|topic2"); + consumer.subscribe(pattern); + + Set expandedAssignment = new HashSet<>(assignment); + expandedAssignment.add(new TopicPartition(topic2, 0)); + expandedAssignment.add(new TopicPartition(topic2, 1)); + awaitAssignment(consumer, expandedAssignment); + } + } + + @ClusterTest + public void testRe2JPatternSubscriptionAndTopicSubscription() throws InterruptedException { + Map config = Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); + try (Consumer consumer = cluster.consumer(config)) { + var topic1 = "topic1"; // matches subscribed pattern + cluster.createTopic(topic1, 2, (short) BROKER_COUNT); + + var topic11 = "topic11"; // matches subscribed pattern + cluster.createTopic(topic11, 2, (short) BROKER_COUNT); + + var topic2 = "topic2"; // does not match subscribed pattern + cluster.createTopic(topic2, 2, (short) BROKER_COUNT); + + assertEquals(0, consumer.assignment().size()); + // Subscribe to pattern + var pattern = new SubscriptionPattern("topic1.*"); + consumer.subscribe(pattern); + + Set patternAssignment = Set.of( + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1), + new TopicPartition(topic11, 0), + new TopicPartition(topic11, 1) + ); + awaitAssignment(consumer, patternAssignment); + consumer.unsubscribe(); + assertEquals(0, consumer.assignment().size()); + + // Subscribe to explicit topic names + consumer.subscribe(List.of(topic2)); + + Set assignment = Set.of( + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1) + ); + awaitAssignment(consumer, assignment); + consumer.unsubscribe(); + + // Subscribe to pattern again + consumer.subscribe(pattern); + awaitAssignment(consumer, patternAssignment); + } + } + + + @ClusterTest + public void testRe2JPatternSubscriptionInvalidRegex() throws InterruptedException { + Map config = Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); + try (Consumer consumer = cluster.consumer(config)) { + assertEquals(0, consumer.assignment().size()); + + var pattern = new SubscriptionPattern("(t.*c"); + consumer.subscribe(pattern); + + waitForPollThrowException(consumer, InvalidRegularExpression.class); + consumer.unsubscribe(); + } + } + + @ClusterTest + public void testClassicConsumerExpandingTopicSubscriptions() throws InterruptedException { + testExpandingTopicSubscriptions(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerExpandingTopicSubscriptions() throws InterruptedException { + testExpandingTopicSubscriptions(GroupProtocol.CONSUMER); + } + + public void testExpandingTopicSubscriptions(GroupProtocol groupProtocol) throws InterruptedException { + Map config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT)); + try (Consumer consumer = cluster.consumer(config)) { + var otherTopic = "other"; + + Set initialAssignment = Set.of( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1) + ); + consumer.subscribe(List.of(topic)); + awaitAssignment(consumer, initialAssignment); + + cluster.createTopic(otherTopic, 2, (short) BROKER_COUNT); + + Set expandedAssignment = new HashSet<>(initialAssignment); + expandedAssignment.add(new TopicPartition(otherTopic, 0)); + expandedAssignment.add(new TopicPartition(otherTopic, 1)); + + consumer.subscribe(List.of(topic, otherTopic)); + awaitAssignment(consumer, expandedAssignment); + } + } + + @ClusterTest + public void testClassicConsumerShrinkingTopicSubscriptions() throws InterruptedException { + testShrinkingTopicSubscriptions(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerShrinkingTopicSubscriptions() throws InterruptedException { + testShrinkingTopicSubscriptions(GroupProtocol.CONSUMER); + } + + public void testShrinkingTopicSubscriptions(GroupProtocol groupProtocol) throws InterruptedException { + Map config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT)); + try (Consumer consumer = cluster.consumer(config)) { + var otherTopic = "other"; + cluster.createTopic(otherTopic, 2, (short) BROKER_COUNT); + + Set initialAssignment = Set.of( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(otherTopic, 0), + new TopicPartition(otherTopic, 1) + ); + consumer.subscribe(List.of(topic, otherTopic)); + awaitAssignment(consumer, initialAssignment); + + Set shrunkenAssignment = Set.of( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1) + ); + consumer.subscribe(List.of(topic)); + awaitAssignment(consumer, shrunkenAssignment); + } + } + + @ClusterTest + public void testClassicConsumerUnsubscribeTopic() throws InterruptedException { + testUnsubscribeTopic(Map.of( + GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT), + SESSION_TIMEOUT_MS_CONFIG, 100, // timeout quickly to avoid slow test + HEARTBEAT_INTERVAL_MS_CONFIG, 30 + )); + } + + @ClusterTest + public void testAsyncConsumerUnsubscribeTopic() throws InterruptedException { + testUnsubscribeTopic(Map.of(GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT))); + } + + public void testUnsubscribeTopic(Map config) throws InterruptedException { + try (Consumer consumer = cluster.consumer(config)) { + var listener = new TestConsumerReassignmentListener(); + consumer.subscribe(List.of(topic), listener); + + // the initial subscription should cause a callback execution + awaitRebalance(consumer, listener); + + consumer.subscribe(List.of()); + assertEquals(0, consumer.assignment().size()); + } + } + + @ClusterTest + public void testClassicConsumerSubscribeInvalidTopicCanUnsubscribe() throws InterruptedException { + testSubscribeInvalidTopicCanUnsubscribe(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerClassicConsumerSubscribeInvalidTopicCanUnsubscribe() throws InterruptedException { + testSubscribeInvalidTopicCanUnsubscribe(GroupProtocol.CONSUMER); + } + + public void testSubscribeInvalidTopicCanUnsubscribe(GroupProtocol groupProtocol) throws InterruptedException { + Map config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT)); + try (Consumer consumer = cluster.consumer(config)) { + setupSubscribeInvalidTopic(consumer); + assertDoesNotThrow(consumer::unsubscribe); + } + } + + @ClusterTest + public void testClassicConsumerSubscribeInvalidTopicCanClose() throws InterruptedException { + testSubscribeInvalidTopicCanClose(GroupProtocol.CLASSIC); + } + + @ClusterTest + public void testAsyncConsumerSubscribeInvalidTopicCanClose() throws InterruptedException { + testSubscribeInvalidTopicCanClose(GroupProtocol.CONSUMER); + } + + public void testSubscribeInvalidTopicCanClose(GroupProtocol groupProtocol) throws InterruptedException { + Map config = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT)); + try (Consumer consumer = cluster.consumer(config)) { + setupSubscribeInvalidTopic(consumer); + assertDoesNotThrow(() -> consumer.close()); + } + } + + private void setupSubscribeInvalidTopic(Consumer consumer) throws InterruptedException { + // Invalid topic name due to space + var invalidTopicName = "topic abc"; + consumer.subscribe(List.of(invalidTopicName)); + + InvalidTopicException[] exception = {null}; + TestUtils.waitForCondition(() -> { + try { + consumer.poll(Duration.ofMillis(500)); + } catch (InvalidTopicException e) { + exception[0] = e; + } catch (Throwable e) { + fail("An InvalidTopicException should be thrown. But " + e.getClass() + " is thrown"); + } + return exception[0] != null; + }, 5000, "An InvalidTopicException should be thrown."); + + assertEquals("Invalid topics: [" + invalidTopicName + "]", exception[0].getMessage()); + } +} diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala deleted file mode 100644 index 776c1aef961..00000000000 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala +++ /dev/null @@ -1,422 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import kafka.utils.{TestInfoUtils, TestUtils} -import org.apache.kafka.clients.consumer._ -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{InvalidRegularExpression, InvalidTopicException} -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Timeout -import org.junit.jupiter.api.function.Executable -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource - -import java.time.Duration -import java.util.regex.Pattern - -/** - * Integration tests for the consumer that covers the subscribe and unsubscribe logic. - */ -@Timeout(600) -class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { - - /** - * Verifies that pattern subscription performs as expected. - * The pattern matches the topics 'topic' and 'tblablac', but not 'tblablak' or 'tblab1'. - * It is expected that the consumer is subscribed to all partitions of 'topic' and - * 'tblablac' after the subscription when metadata is refreshed. - * When a new topic 'tsomec' is added afterwards, it is expected that upon the next - * metadata refresh the consumer becomes subscribed to this new topic and all partitions - * of that topic are assigned to it. - */ - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPatternSubscription(groupProtocol: String): Unit = { - val numRecords = 10000 - val producer = createProducer() - sendRecords(producer, numRecords, tp) - - val topic1 = "tblablac" // matches subscribed pattern - createTopic(topic1, 2, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0)) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1)) - - val topic2 = "tblablak" // does not match subscribed pattern - createTopic(topic2, 2, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 0)) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1)) - - val topic3 = "tblab1" // does not match subscribed pattern - createTopic(topic3, 2, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0)) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1)) - - val consumer = createConsumer() - assertEquals(0, consumer.assignment().size) - - val pattern = Pattern.compile("t.*c") - consumer.subscribe(pattern, new TestConsumerReassignmentListener) - - var assignment = Set( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 1)) - awaitAssignment(consumer, assignment) - - val topic4 = "tsomec" // matches subscribed pattern - createTopic(topic4, 2, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0)) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1)) - - assignment ++= Set( - new TopicPartition(topic4, 0), - new TopicPartition(topic4, 1)) - awaitAssignment(consumer, assignment) - - consumer.unsubscribe() - assertEquals(0, consumer.assignment().size) - } - - /** - * Verifies that a second call to pattern subscription succeeds and performs as expected. - * The initial subscription is to a pattern that matches two topics 'topic' and 'foo'. - * The second subscription is to a pattern that matches 'foo' and a new topic 'bar'. - * It is expected that the consumer is subscribed to all partitions of 'topic' and 'foo' after - * the first subscription, and to all partitions of 'foo' and 'bar' after the second. - * The metadata refresh interval is intentionally increased to a large enough value to guarantee - * that it is the subscription call that triggers a metadata refresh, and not the timeout. - */ - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testSubsequentPatternSubscription(groupProtocol: String): Unit = { - this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "30000") - val consumer = createConsumer() - - val numRecords = 10000 - val producer = createProducer() - sendRecords(producer, numRecords = numRecords, tp) - - // the first topic ('topic') matches first subscription pattern only - - val fooTopic = "foo" // matches both subscription patterns - createTopic(fooTopic, 1, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0)) - - assertEquals(0, consumer.assignment().size) - - val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match this - consumer.subscribe(pattern1, new TestConsumerReassignmentListener) - - var assignment = Set( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(fooTopic, 0)) - awaitAssignment(consumer, assignment) - - val barTopic = "bar" // matches the next subscription pattern - createTopic(barTopic, 1, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(barTopic, 0)) - - val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this - consumer.subscribe(pattern2, new TestConsumerReassignmentListener) - assignment --= Set( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1)) - assignment ++= Set( - new TopicPartition(barTopic, 0)) - awaitAssignment(consumer, assignment) - - consumer.unsubscribe() - assertEquals(0, consumer.assignment().size) - } - - /** - * Verifies that pattern unsubscription performs as expected. - * The pattern matches the topics 'topic' and 'tblablac'. - * It is expected that the consumer is subscribed to all partitions of 'topic' and - * 'tblablac' after the subscription when metadata is refreshed. - * When consumer unsubscribes from all its subscriptions, it is expected that its - * assignments are cleared right away. - */ - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testPatternUnsubscription(groupProtocol: String): Unit = { - val numRecords = 10000 - val producer = createProducer() - sendRecords(producer, numRecords, tp) - - val topic1 = "tblablac" // matches the subscription pattern - createTopic(topic1, 2, brokerCount) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0)) - sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1)) - - val consumer = createConsumer() - assertEquals(0, consumer.assignment().size) - - consumer.subscribe(Pattern.compile("t.*c"), new TestConsumerReassignmentListener) - val assignment = Set( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 1)) - awaitAssignment(consumer, assignment) - - consumer.unsubscribe() - assertEquals(0, consumer.assignment().size) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly")) - def testRe2JPatternSubscription(groupProtocol: String): Unit = { - val topic1 = "tblablac" // matches subscribed pattern - createTopic(topic1, 2, brokerCount) - - val topic2 = "tblablak" // does not match subscribed pattern - createTopic(topic2, 2, brokerCount) - - val topic3 = "tblab1" // does not match subscribed pattern - createTopic(topic3, 2, brokerCount) - - val consumer = createConsumer() - assertEquals(0, consumer.assignment().size) - - var pattern = new SubscriptionPattern("t.*c") - consumer.subscribe(pattern) - - var assignment = Set( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 1)) - awaitAssignment(consumer, assignment) - consumer.unsubscribe() - assertEquals(0, consumer.assignment().size) - - // Subscribe to a different pattern to match topic2 (that did not match before) - pattern = new SubscriptionPattern(topic2 + ".*") - consumer.subscribe(pattern) - - assignment = Set( - new TopicPartition(topic2, 0), - new TopicPartition(topic2, 1)) - awaitAssignment(consumer, assignment) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly")) - def testRe2JPatternSubscriptionFetch(groupProtocol: String): Unit = { - val topic1 = "topic1" // matches subscribed pattern - createTopic(topic1, 2, brokerCount) - - val consumer = createConsumer() - assertEquals(0, consumer.assignment().size) - - val pattern = new SubscriptionPattern("topic.*") - consumer.subscribe(pattern) - - val assignment = Set( - new TopicPartition(topic, 0), - new TopicPartition(topic, 1), - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 1)) - awaitAssignment(consumer, assignment) - - val producer = createProducer() - val totalRecords = 10L - val startingTimestamp = System.currentTimeMillis() - val tp = new TopicPartition(topic1, 0) - sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp) - consumeAndVerifyRecords(consumer = consumer, numRecords = totalRecords.toInt, startingOffset = 0, startingTimestamp = startingTimestamp, tp = tp) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly")) - def testRe2JPatternExpandSubscription(groupProtocol: String): Unit = { - val topic1 = "topic1" // matches first pattern - createTopic(topic1, 2, brokerCount) - - val topic2 = "topic2" // does not match first pattern - createTopic(topic2, 2, brokerCount) - - val consumer = createConsumer() - assertEquals(0, consumer.assignment().size) - - var pattern = new SubscriptionPattern("topic1.*") - consumer.subscribe(pattern) - val assignment = Set( - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 1)) - awaitAssignment(consumer, assignment) - - consumer.unsubscribe() - assertEquals(0, consumer.assignment().size) - - // Subscribe to a different pattern that should match - // the same topics the member already had plus new ones - pattern = new SubscriptionPattern("topic1|topic2") - consumer.subscribe(pattern) - - val expandedAssignment = assignment ++ Set(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)) - awaitAssignment(consumer, expandedAssignment) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly")) - def testRe2JPatternSubscriptionAndTopicSubscription(groupProtocol: String): Unit = { - val topic1 = "topic1" // matches subscribed pattern - createTopic(topic1, 2, brokerCount) - - val topic11 = "topic11" // matches subscribed pattern - createTopic(topic11, 2, brokerCount) - - val topic2 = "topic2" // does not match subscribed pattern - createTopic(topic2, 2, brokerCount) - - val consumer = createConsumer() - assertEquals(0, consumer.assignment().size) - - // Subscribe to pattern - val pattern = new SubscriptionPattern("topic1.*") - consumer.subscribe(pattern) - val patternAssignment = Set( - new TopicPartition(topic1, 0), - new TopicPartition(topic1, 1), - new TopicPartition(topic11, 0), - new TopicPartition(topic11, 1)) - awaitAssignment(consumer, patternAssignment) - consumer.unsubscribe() - assertEquals(0, consumer.assignment().size) - - // Subscribe to explicit topic names - consumer.subscribe(java.util.List.of(topic2)) - val assignment = Set( - new TopicPartition(topic2, 0), - new TopicPartition(topic2, 1)) - awaitAssignment(consumer, assignment) - consumer.unsubscribe() - - // Subscribe to pattern again - consumer.subscribe(pattern) - awaitAssignment(consumer, patternAssignment) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersConsumerGroupProtocolOnly")) - def testRe2JPatternSubscriptionInvalidRegex(groupProtocol: String): Unit = { - val consumer = createConsumer() - assertEquals(0, consumer.assignment().size) - - val pattern = new SubscriptionPattern("(t.*c") - consumer.subscribe(pattern) - - TestUtils.tryUntilNoAssertionError() { - assertThrows(classOf[InvalidRegularExpression], () => consumer.poll(Duration.ZERO)) - } - consumer.unsubscribe() - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testExpandingTopicSubscriptions(groupProtocol: String): Unit = { - val otherTopic = "other" - val initialAssignment = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) - val consumer = createConsumer() - consumer.subscribe(java.util.List.of(topic)) - awaitAssignment(consumer, initialAssignment) - - createTopic(otherTopic, 2, brokerCount) - val expandedAssignment = initialAssignment ++ Set(new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1)) - consumer.subscribe(java.util.List.of(topic, otherTopic)) - awaitAssignment(consumer, expandedAssignment) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testShrinkingTopicSubscriptions(groupProtocol: String): Unit = { - val otherTopic = "other" - createTopic(otherTopic, 2, brokerCount) - val initialAssignment = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1)) - val consumer = createConsumer() - consumer.subscribe(java.util.List.of(topic, otherTopic)) - awaitAssignment(consumer, initialAssignment) - - val shrunkenAssignment = Set(new TopicPartition(topic, 0), new TopicPartition(topic, 1)) - consumer.subscribe(java.util.List.of(topic)) - awaitAssignment(consumer, shrunkenAssignment) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testUnsubscribeTopic(groupProtocol: String): Unit = { - if (groupProtocol.equalsIgnoreCase(GroupProtocol.CLASSIC.name)) { - this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test - this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30") - } - val consumer = createConsumer() - - val listener = new TestConsumerReassignmentListener() - consumer.subscribe(java.util.List.of(topic), listener) - - // the initial subscription should cause a callback execution - awaitRebalance(consumer, listener) - - consumer.subscribe(java.util.List.of[String]()) - assertEquals(0, consumer.assignment.size()) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testSubscribeInvalidTopicCanUnsubscribe(groupProtocol: String): Unit = { - val consumer = createConsumer() - - setupSubscribeInvalidTopic(consumer) - if(groupProtocol == "consumer") { - // Must ensure memberId is not empty before sending leave group heartbeat. This is a temporary solution before KIP-1082. - TestUtils.waitUntilTrue(() => consumer.groupMetadata().memberId().nonEmpty, - waitTimeMs = 30000, msg = "Timeout waiting for first consumer group heartbeat response") - } - assertDoesNotThrow(new Executable { - override def execute(): Unit = consumer.unsubscribe() - }) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testSubscribeInvalidTopicCanClose(groupProtocol: String): Unit = { - val consumer = createConsumer() - - setupSubscribeInvalidTopic(consumer) - assertDoesNotThrow(new Executable { - override def execute(): Unit = consumer.close() - }) - } - - def setupSubscribeInvalidTopic(consumer: Consumer[Array[Byte], Array[Byte]]): Unit = { - // Invalid topic name due to space - val invalidTopicName = "topic abc" - consumer.subscribe(java.util.List.of(invalidTopicName)) - - var exception : InvalidTopicException = null - TestUtils.waitUntilTrue(() => { - try consumer.poll(Duration.ofMillis(500)) catch { - case e : InvalidTopicException => exception = e - case e : Throwable => fail(s"An InvalidTopicException should be thrown. But ${e.getClass} is thrown") - } - exception != null - }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.") - - assertEquals(s"Invalid topics: [$invalidTopicName]", exception.getMessage) - } -}