diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 6b42c4f566a..e94bcbc56a3 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -20,7 +20,7 @@ import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterFeature, import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} -import org.apache.kafka.common.Uuid +import org.apache.kafka.common.{TopicCollection, Uuid} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData} import org.apache.kafka.common.protocol.Errors @@ -174,7 +174,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { ) // Heartbeat request to join the group. Note that the member subscribes - // to an nonexistent topic. + // to a nonexistent topic. var consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( new ConsumerGroupHeartbeatRequestData() .setGroupId("grp") @@ -214,7 +214,7 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { ).build() // This is the expected assignment. - val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + var expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() .setTopicId(topicId) .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) @@ -230,6 +230,32 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { // Verify the response. assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) + + // Delete the topic. + admin.deleteTopics(TopicCollection.ofTopicIds(List(topicId).asJava)).all.get + + // Prepare the next heartbeat. + consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId("grp") + .setMemberId(consumerGroupHeartbeatResponse.data.memberId) + .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch) + ).build() + + // This is the expected assignment. + expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + + // Heartbeats until the partitions are revoked. + consumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) + consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code && + consumerGroupHeartbeatResponse.data.assignment == expectedAssignment + }, msg = s"Could not get partitions revoked. Last response $consumerGroupHeartbeatResponse.") + + // Verify the response. + assertEquals(2, consumerGroupHeartbeatResponse.data.memberEpoch) + assertEquals(expectedAssignment, consumerGroupHeartbeatResponse.data.assignment) } finally { admin.close() } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index f71b47690d0..d3047053a58 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -4308,9 +4308,18 @@ public class GroupMetadataManager { String groupId = key.groupId(); String regex = key.regularExpression(); + ConsumerGroup consumerGroup; + try { + consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null); + } catch (GroupIdNotFoundException ex) { + // If the group does not exist and a tombstone is replayed, we can ignore it. + return; + } + + Set oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames().keySet()); + if (value != null) { - ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, true); - group.updateResolvedRegularExpression( + consumerGroup.updateResolvedRegularExpression( regex, new ResolvedRegularExpression( new HashSet<>(value.topics()), @@ -4319,13 +4328,10 @@ public class GroupMetadataManager { ) ); } else { - try { - ConsumerGroup group = getOrMaybeCreatePersistedConsumerGroup(groupId, false); - group.removeResolvedRegularExpression(regex); - } catch (GroupIdNotFoundException ex) { - // If the group does not exist, we can ignore the tombstone. - } + consumerGroup.removeResolvedRegularExpression(regex); } + + updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames().keySet()); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 4933e505067..f83bd1c267b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -16076,6 +16076,11 @@ public class GroupMetadataManagerTest { Optional.of(resolvedRegularExpression), context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*") ); + + assertEquals( + Set.of("foo"), + context.groupMetadataManager.groupsSubscribedToTopic("abc") + ); } @Test @@ -16101,6 +16106,11 @@ public class GroupMetadataManagerTest { resolvedRegularExpression )); + assertEquals( + Set.of("foo"), + context.groupMetadataManager.groupsSubscribedToTopic("abc") + ); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone( "foo", "abc*" @@ -16110,6 +16120,11 @@ public class GroupMetadataManagerTest { Optional.empty(), context.groupMetadataManager.consumerGroup("foo").resolvedRegularExpression("abc*") ); + + assertEquals( + Set.of(), + context.groupMetadataManager.groupsSubscribedToTopic("abc") + ); } @Test