KAFKA-18916; Resolved regular expressions must update the group by topics data structure (#19088)

When regular expressions are resolved, they do not update the group by
topics data structure. Hence, topic changes (e.g. deletion) do not
trigger a rebalance of the group.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
David Jacot 2025-03-04 15:31:08 +01:00 committed by GitHub
parent 101e15bb1c
commit 1df4a42b40
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 58 additions and 11 deletions

View File

@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature,
import kafka.utils.TestUtils import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.Uuid import org.apache.kafka.common.{TopicCollection, Uuid}
import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData} import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData}
import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors
@ -174,7 +174,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
) )
// Heartbeat request to join the group. Note that the member subscribes // Heartbeat request to join the group. Note that the member subscribes
// to an nonexistent topic. // to a nonexistent topic.
var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData() new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp") .setGroupId("grp")
@ -214,7 +214,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
).build() ).build()
// This is the expected assignment. // This is the expected assignment.
val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() var expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId) .setTopicId(topicId)
.setPartitions(List[Integer](0, 1, 2).asJava)).asJava) .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
@ -230,6 +230,32 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
// Verify the response. // Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
// Delete the topic.
admin.deleteTopics(TopicCollection.ofTopicIds(List(topicId).asJava)).all.get
// Prepare the next heartbeat.
consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
new ConsumerGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(consumerGroupHeartbeatResponse.data.memberId)
.setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch)
).build()
// This is the expected assignment.
expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment()
// Heartbeats until the partitions are revoked.
consumerGroupHeartbeatResponse = null
TestUtils.waitUntilTrue(() => {
consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
consumerGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions revoked. Last response $consumerGroupHeartbeatResponse.")
// Verify the response.
assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment)
} finally { } finally {
admin.close() admin.close()
} }

View File

@ -4308,9 +4308,18 @@ public class GroupMetadataManager {
String groupId = key.groupId(); String groupId = key.groupId();
String regex = key.regularExpression(); String regex = key.regularExpression();
ConsumerGroup consumerGroup;
try {
consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
} catch (GroupIdNotFoundException ex) {
// If the group does not exist and a tombstone is replayed, we can ignore it.
return;
}
Set<String> oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames().keySet());
if (value != null) { if (value != null) {
ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, true); consumerGroup.updateResolvedRegularExpression(
group.updateResolvedRegularExpression(
regex, regex,
new ResolvedRegularExpression( new ResolvedRegularExpression(
new HashSet<>(value.topics()), new HashSet<>(value.topics()),
@ -4319,13 +4328,10 @@ public class GroupMetadataManager {
) )
); );
} else { } else {
try { consumerGroup.removeResolvedRegularExpression(regex);
ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, false);
group.removeResolvedRegularExpression(regex);
} catch (GroupIdNotFoundException ex) {
// If the group does not exist, we can ignore the tombstone.
}
} }
updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames().keySet());
} }
/** /**

View File

@ -16076,6 +16076,11 @@ public class GroupMetadataManagerTest {
Optional.of(resolvedRegularExpression), Optional.of(resolvedRegularExpression),
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*") context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
); );
assertEquals(
Set.of("foo"),
context.groupMetadataManager.groupsSubscribedToTopic("abc")
);
} }
@Test @Test
@ -16101,6 +16106,11 @@ public class GroupMetadataManagerTest {
resolvedRegularExpression resolvedRegularExpression
)); ));
assertEquals(
Set.of("foo"),
context.groupMetadataManager.groupsSubscribedToTopic("abc")
);
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone( context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(
"foo", "foo",
"abc*" "abc*"
@ -16110,6 +16120,11 @@ public class GroupMetadataManagerTest {
Optional.empty(), Optional.empty(),
context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*") context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*")
); );
assertEquals(
Set.of(),
context.groupMetadataManager.groupsSubscribedToTopic("abc")
);
} }
@Test @Test