From 42e7cbb67ed06a0c3c2a2a8251046f439dff3f95 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 4 Feb 2025 18:22:28 +0000 Subject: [PATCH] KAFKA-18690: Keep leader metadata for RE2J-assigned partitions (#18777) Reviewers: Lianet Magrans --- .../consumer/internals/ConsumerMetadata.java | 2 +- .../consumer/internals/SubscriptionState.java | 14 ++++++++++ .../PlaintextConsumerSubscriptionTest.scala | 27 +++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java index cb4c7dde6f8..434e989f068 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java @@ -94,6 +94,6 @@ public class ConsumerMetadata extends Metadata { if (isInternal && !includeInternalTopics) return false; - return subscription.matchesSubscribedPattern(topic); + return subscription.matchesSubscribedPattern(topic) || subscription.isAssignedFromRe2j(topic); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index bd45e71c884..e237165f5b7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -490,6 +490,20 @@ public class SubscriptionState { || this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE || this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J; } + public synchronized boolean isAssignedFromRe2j(String topic) { + if (!hasRe2JPatternSubscription()) { + return false; + } + + for (TopicPartition topicPartition : assignment.partitionSet()) { + if (topicPartition.topic().equals(topic)) { + return true; + } + } + + return false; + } + public synchronized void position(TopicPartition tp, FetchPosition position) { assignedState(tp).position(position); } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala index 5eea54b23d1..70abc3f8412 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala @@ -215,6 +215,33 @@ class PlaintextConsumerSubscriptionTest extends AbstractConsumerTest { awaitAssignment(consumer, assignment) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) + def testRe2JPatternSubscriptionFetch(quorum: String, groupProtocol: String): Unit = { + val topic1 = "topic1" // matches subscribed pattern + createTopic(topic1, 2, brokerCount) + + val consumer = createConsumer() + assertEquals(0, consumer.assignment().size) + + val pattern = new SubscriptionPattern("topic.*") + consumer.subscribe(pattern) + + val assignment = Set( + new TopicPartition(topic, 0), + new TopicPartition(topic, 1), + new TopicPartition(topic1, 0), + new TopicPartition(topic1, 1)) + awaitAssignment(consumer, assignment) + + val producer = createProducer() + val totalRecords = 10L + val startingTimestamp = System.currentTimeMillis() + val tp = new TopicPartition(topic1, 0) + sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = startingTimestamp) + consumeAndVerifyRecords(consumer = consumer, numRecords = totalRecords.toInt, startingOffset = 0, startingTimestamp = startingTimestamp, tp = tp) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) def testRe2JPatternExpandSubscription(quorum: String, groupProtocol: String): Unit = {