mirror of https://github.com/apache/kafka.git
KAFKA-18690: Keep leader metadata for RE2J-assigned partitions (#18777)
Reviewers: Lianet Magrans <lmagrans@confluent.io>
This commit is contained in:
parent
8e3a001bf5
commit
42e7cbb67e
|
@ -94,6 +94,6 @@ public class ConsumerMetadata extends Metadata {
|
|||
if (isInternal && !includeInternalTopics)
|
||||
return false;
|
||||
|
||||
return subscription.matchesSubscribedPattern(topic);
|
||||
return subscription.matchesSubscribedPattern(topic) || subscription.isAssignedFromRe2j(topic);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -490,6 +490,20 @@ public class SubscriptionState {
|
|||
|| this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE || this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
|
||||
}
|
||||
|
||||
public synchronized boolean isAssignedFromRe2j(String topic) {
|
||||
if (!hasRe2JPatternSubscription()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (TopicPartition topicPartition : assignment.partitionSet()) {
|
||||
if (topicPartition.topic().equals(topic)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public synchronized void position(TopicPartition tp, FetchPosition position) {
|
||||
assignedState(tp).position(position);
|
||||
}
|
||||
|
|
|
@ -215,6 +215,33 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest {
|
|||
awaitAssignment(consumer, assignment)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
|
||||
def testRe2JPatternSubscriptionFetch(quorum: String, groupProtocol: String): Unit = {
|
||||
val topic1 = "topic1" // matches subscribed pattern
|
||||
createTopic(topic1, 2, brokerCount)
|
||||
|
||||
val consumer = createConsumer()
|
||||
assertEquals(0, consumer.assignment().size)
|
||||
|
||||
val pattern = new SubscriptionPattern("topic.*")
|
||||
consumer.subscribe(pattern)
|
||||
|
||||
val assignment = Set(
|
||||
new TopicPartition(topic, 0),
|
||||
new TopicPartition(topic, 1),
|
||||
new TopicPartition(topic1, 0),
|
||||
new TopicPartition(topic1, 1))
|
||||
awaitAssignment(consumer, assignment)
|
||||
|
||||
val producer = createProducer()
|
||||
val totalRecords = 10L
|
||||
val startingTimestamp = System.currentTimeMillis()
|
||||
val tp = new TopicPartition(topic1, 0)
|
||||
sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp)
|
||||
consumeAndVerifyRecords(consumer = consumer, numRecords = totalRecords.toInt, startingOffset = 0, startingTimestamp = startingTimestamp, tp = tp)
|
||||
}
|
||||
|
||||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
|
||||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
|
||||
def testRe2JPatternExpandSubscription(quorum: String, groupProtocol: String): Unit = {
|
||||
|
|
Loading…
Reference in New Issue