From b99c22770aafb873935d6daed4b942d326c05aba Mon Sep 17 00:00:00 2001 From: Lianet Magrans <98415067+lianetm@users.noreply.github.com> Date: Tue, 10 Dec 2024 09:01:02 -0500 Subject: [PATCH] regex integration tests (#18079) Reviewers: David Jacot --- .../PlaintextConsumerSubscriptionTest.scala | 84 ++++++++++++++++++- 1 file changed, 82 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala index f9c6d18b470..5eea54b23d1 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala @@ -193,10 +193,10 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { val consumer = createConsumer() assertEquals(0, consumer.assignment().size) - val pattern = new SubscriptionPattern("t.*c") + var pattern = new SubscriptionPattern("t.*c") consumer.subscribe(pattern) - val assignment = Set( + var assignment = Set( new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic1, 0), @@ -204,6 +204,86 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { awaitAssignment(consumer, assignment) consumer.unsubscribe() assertEquals(0, consumer.assignment().size) + + // Subscribe to a different pattern to match topic2 (that did not match before) + pattern = new SubscriptionPattern(topic2 + ".*") + consumer.subscribe(pattern) + + assignment = Set( + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)) + awaitAssignment(consumer, assignment) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testRe2JPatternExpandSubscription(quorum: String, groupProtocol: String): Unit = { + val topic1 = "topic1" // matches first pattern + createTopic(topic1, 2, brokerCount) + + val topic2 = "topic2" // does not match first pattern + createTopic(topic2, 2, brokerCount) + + val consumer = createConsumer() + assertEquals(0, consumer.assignment().size) + + var pattern = new SubscriptionPattern("topic1.*") + consumer.subscribe(pattern) + val assignment = Set( + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1)) + awaitAssignment(consumer, assignment) + + consumer.unsubscribe() + assertEquals(0, consumer.assignment().size) + + // Subscribe to a different pattern that should match + // the same topics the member already had plus new ones + pattern = new SubscriptionPattern("topic1|topic2") + consumer.subscribe(pattern) + + val expandedAssignment = assignment ++ Set(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1)) + awaitAssignment(consumer, expandedAssignment) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testRe2JPatternSubscriptionAndTopicSubscription(quorum: String, groupProtocol: String): Unit = { + val topic1 = "topic1" // matches subscribed pattern + createTopic(topic1, 2, brokerCount) + + val topic11 = "topic11" // matches subscribed pattern + createTopic(topic11, 2, brokerCount) + + val topic2 = "topic2" // does not match subscribed pattern + createTopic(topic2, 2, brokerCount) + + val consumer = createConsumer() + assertEquals(0, consumer.assignment().size) + + // Subscribe to pattern + val pattern = new SubscriptionPattern("topic1.*") + consumer.subscribe(pattern) + val patternAssignment = Set( + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1), + new TopicPartition(topic11, 0), + new TopicPartition(topic11, 1)) + awaitAssignment(consumer, patternAssignment) + consumer.unsubscribe() + assertEquals(0, consumer.assignment().size) + + // Subscribe to explicit topic names + consumer.subscribe(List(topic2).asJava) + val assignment = Set( + new TopicPartition(topic2, 0), + new TopicPartition(topic2, 1)) + awaitAssignment(consumer, assignment) + consumer.unsubscribe() + + // Subscribe to pattern again + consumer.subscribe(pattern) + awaitAssignment(consumer, patternAssignment) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)