mirror of https://github.com/apache/kafka.git
regex integration tests (#18079)
Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
c8380ae779
commit
b99c22770a
|
@ -193,10 +193,10 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
|
|||
val consumer = createConsumer()
|
||||
assertEquals(0, consumer.assignment().size)
|
||||
|
||||
val pattern = new SubscriptionPattern("t.*c")
|
||||
var pattern = new SubscriptionPattern("t.*c")
|
||||
consumer.subscribe(pattern)
|
||||
|
||||
val assignment = Set(
|
||||
var assignment = Set(
|
||||
new TopicPartition(topic, 0),
|
||||
new TopicPartition(topic, 1),
|
||||
new TopicPartition(topic1, 0),
|
||||
|
@ -204,6 +204,86 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
|
|||
awaitAssignment(consumer, assignment)
|
||||
consumer.unsubscribe()
|
||||
assertEquals(0, consumer.assignment().size)
|
||||
|
||||
// Subscribe to a different pattern to match topic2 (that did not match before)
|
||||
pattern = new SubscriptionPattern(topic2 + ".*")
|
||||
consumer.subscribe(pattern)
|
||||
|
||||
assignment = Set(
|
||||
new TopicPartition(topic2, 0),
|
||||
new TopicPartition(topic2, 1))
|
||||
awaitAssignment(consumer, assignment)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
|
||||
def testRe2JPatternExpandSubscription(quorum: String, groupProtocol: String): Unit = {
|
||||
val topic1 = "topic1" // matches first pattern
|
||||
createTopic(topic1, 2, brokerCount)
|
||||
|
||||
val topic2 = "topic2" // does not match first pattern
|
||||
createTopic(topic2, 2, brokerCount)
|
||||
|
||||
val consumer = createConsumer()
|
||||
assertEquals(0, consumer.assignment().size)
|
||||
|
||||
var pattern = new SubscriptionPattern("topic1.*")
|
||||
consumer.subscribe(pattern)
|
||||
val assignment = Set(
|
||||
new TopicPartition(topic1, 0),
|
||||
new TopicPartition(topic1, 1))
|
||||
awaitAssignment(consumer, assignment)
|
||||
|
||||
consumer.unsubscribe()
|
||||
assertEquals(0, consumer.assignment().size)
|
||||
|
||||
// Subscribe to a different pattern that should match
|
||||
// the same topics the member already had plus new ones
|
||||
pattern = new SubscriptionPattern("topic1|topic2")
|
||||
consumer.subscribe(pattern)
|
||||
|
||||
val expandedAssignment = assignment ++ Set(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))
|
||||
awaitAssignment(consumer, expandedAssignment)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
|
||||
def testRe2JPatternSubscriptionAndTopicSubscription(quorum: String, groupProtocol: String): Unit = {
|
||||
val topic1 = "topic1" // matches subscribed pattern
|
||||
createTopic(topic1, 2, brokerCount)
|
||||
|
||||
val topic11 = "topic11" // matches subscribed pattern
|
||||
createTopic(topic11, 2, brokerCount)
|
||||
|
||||
val topic2 = "topic2" // does not match subscribed pattern
|
||||
createTopic(topic2, 2, brokerCount)
|
||||
|
||||
val consumer = createConsumer()
|
||||
assertEquals(0, consumer.assignment().size)
|
||||
|
||||
// Subscribe to pattern
|
||||
val pattern = new SubscriptionPattern("topic1.*")
|
||||
consumer.subscribe(pattern)
|
||||
val patternAssignment = Set(
|
||||
new TopicPartition(topic1, 0),
|
||||
new TopicPartition(topic1, 1),
|
||||
new TopicPartition(topic11, 0),
|
||||
new TopicPartition(topic11, 1))
|
||||
awaitAssignment(consumer, patternAssignment)
|
||||
consumer.unsubscribe()
|
||||
assertEquals(0, consumer.assignment().size)
|
||||
|
||||
// Subscribe to explicit topic names
|
||||
consumer.subscribe(List(topic2).asJava)
|
||||
val assignment = Set(
|
||||
new TopicPartition(topic2, 0),
|
||||
new TopicPartition(topic2, 1))
|
||||
awaitAssignment(consumer, assignment)
|
||||
consumer.unsubscribe()
|
||||
|
||||
// Subscribe to pattern again
|
||||
consumer.subscribe(pattern)
|
||||
awaitAssignment(consumer, patternAssignment)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
|
|
Loading…
Reference in New Issue