diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index 288e8d3abe1..e6cf1700cc8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -28,7 +28,6 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; @@ -127,36 +126,6 @@ public class GroupCoordinatorRecordHelpers { ); } - /** - * Creates a ConsumerGroupPartitionMetadata record. - * - * @param groupId The consumer group id. - * @param newSubscriptionMetadata The subscription metadata. - * @return The record. - */ - public static CoordinatorRecord newConsumerGroupSubscriptionMetadataRecord( - String groupId, - Map newSubscriptionMetadata - ) { - ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue(); - newSubscriptionMetadata.forEach((topicName, topicMetadata) -> - value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata() - .setTopicId(topicMetadata.id()) - .setTopicName(topicMetadata.name()) - .setNumPartitions(topicMetadata.numPartitions()) - ) - ); - - return CoordinatorRecord.record( - new ConsumerGroupPartitionMetadataKey() - .setGroupId(groupId), - new ApiMessageAndVersion( - value, - (short) 0 - ) - ); - } - /** * Creates a ConsumerGroupPartitionMetadata tombstone. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 1dd7d2ed884..b56a9add64f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -229,7 +229,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord; +import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord; @@ -490,6 +490,13 @@ public class GroupMetadataManager { */ private MetadataImage metadataImage; + /** + * The cache for topic hash value by topic name. + * A topic hash is calculated when there is a group subscribes to it. + * A topic hash is removed when it's updated in MetadataImage or there is no group subscribes to it. + */ + private final Map topicHashCache; + /** * This tracks the version (or the offset) of the last metadata image * with newly created topics. @@ -550,6 +557,7 @@ public class GroupMetadataManager { this.shareGroupAssignor = shareGroupAssignor; this.authorizerPlugin = authorizerPlugin; this.streamsGroupAssignors = streamsGroupAssignors.stream().collect(Collectors.toMap(TaskAssignor::name, Function.identity())); + this.topicHashCache = new HashMap<>(); } /** @@ -1355,8 +1363,8 @@ public class GroupMetadataManager { snapshotRegistry, metrics, classicGroup, - metadataImage.topics(), - metadataImage.cluster() + topicHashCache, + metadataImage ); } catch (SchemaException e) { log.warn("Cannot upgrade classic group " + classicGroup.groupId() + @@ -3302,26 +3310,24 @@ public class GroupMetadataManager { )); } - // Compute the subscription metadata. - Map subscriptionMetadata = group.computeSubscriptionMetadata( + long groupMetadataHash = ModernGroup.computeMetadataHash( subscribedTopicNames, - metadataImage.topics(), - metadataImage.cluster() + topicHashCache, + metadataImage ); - if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + if (groupMetadataHash != group.metadataHash()) { if (log.isDebugEnabled()) { - log.debug("[GroupId {}] Computed new subscription metadata: {}.", - groupId, subscriptionMetadata); + log.debug("[GroupId {}] Computed new metadata hash: {}.", + groupId, groupMetadataHash); } bumpGroupEpoch = true; - records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); } if (bumpGroupEpoch) { int groupEpoch = group.groupEpoch() + 1; - records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0)); - log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, groupMetadataHash)); + log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash); metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); group.setMetadataRefreshDeadline( time.milliseconds() + METADATA_REFRESH_INTERVAL_MS, @@ -3608,10 +3614,11 @@ public class GroupMetadataManager { member, updatedMember ); - Map subscriptionMetadata = group.computeSubscriptionMetadata( + + long groupMetadataHash = ModernGroup.computeMetadataHash( subscribedTopicNamesMap, - metadataImage.topics(), - metadataImage.cluster() + topicHashCache, + metadataImage ); int numMembers = group.numMembers(); @@ -3625,24 +3632,30 @@ public class GroupMetadataManager { numMembers ); - if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + if (groupMetadataHash != group.metadataHash()) { if (log.isDebugEnabled()) { - log.debug("[GroupId {}] Computed new subscription metadata: {}.", - groupId, subscriptionMetadata); + log.debug("[GroupId {}] Computed new metadata hash: {}.", + groupId, groupMetadataHash); } bumpGroupEpoch = true; - records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); } if (bumpGroupEpoch) { groupEpoch += 1; - records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0)); - log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); + records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, groupMetadataHash)); + log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash); metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); } group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); + // Before 4.0, the coordinator used subscription metadata to keep topic metadata. + // After 4.1, the subscription metadata is replaced by the metadata hash. If there is subscription metadata in log, + // add a tombstone record to remove it. + if (group.hasSubscriptionMetadataRecord()) { + records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)); + } + return new UpdateSubscriptionMetadataResult( groupEpoch, subscriptionType @@ -4015,25 +4028,27 @@ public class GroupMetadataManager { members ); - // We update the subscription metadata without the leaving members. - Map subscriptionMetadata = group.computeSubscriptionMetadata( - group.computeSubscribedTopicNamesWithoutDeletedMembers(members, deletedRegexes), - metadataImage.topics(), - metadataImage.cluster() + Map subscribedTopicNamesMap = group.computeSubscribedTopicNamesWithoutDeletedMembers( + members, + deletedRegexes + ); + long groupMetadataHash = ModernGroup.computeMetadataHash( + subscribedTopicNamesMap, + topicHashCache, + metadataImage ); - if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + if (groupMetadataHash != group.metadataHash()) { if (log.isDebugEnabled()) { - log.debug("[GroupId {}] Computed new subscription metadata: {}.", - group.groupId(), subscriptionMetadata); + log.debug("[GroupId {}] Computed new metadata hash: {}.", + group.groupId(), groupMetadataHash); } - records.add(newConsumerGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata)); } // We bump the group epoch. int groupEpoch = group.groupEpoch() + 1; - records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch, 0)); - log.info("[GroupId {}] Bumped group epoch to {}.", group.groupId(), groupEpoch); + records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch, groupMetadataHash)); + log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", group.groupId(), groupEpoch, groupMetadataHash); for (ConsumerGroupMember member : members) { cancelTimers(group.groupId(), member.memberId()); @@ -4981,7 +4996,11 @@ public class GroupMetadataManager { ) { groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> { groupIds.remove(groupId); - return groupIds.isEmpty() ? null : groupIds; + if (groupIds.isEmpty()) { + topicHashCache.remove(topicName); + return null; + } + return groupIds; }); } @@ -5036,6 +5055,7 @@ public class GroupMetadataManager { if (value != null) { ConsumerGroup consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, true); consumerGroup.setGroupEpoch(value.epoch()); + consumerGroup.setMetadataHash(value.metadataHash()); } else { ConsumerGroup consumerGroup; try { @@ -5085,15 +5105,9 @@ public class GroupMetadataManager { return; } - if (value != null) { - Map subscriptionMetadata = new HashMap<>(); - value.topics().forEach(topicMetadata -> { - subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata)); - }); - group.setSubscriptionMetadata(subscriptionMetadata); - } else { - group.setSubscriptionMetadata(Map.of()); - } + // If value is not null, add subscription metadata tombstone record in the next consumer group heartbeat, + // because the subscription metadata is replaced by metadata hash in ConsumerGroupMetadataValue. + group.setHasSubscriptionMetadataRecord(value != null); } /** @@ -5734,11 +5748,15 @@ public class GroupMetadataManager { Set allGroupIds = new HashSet<>(); topicsDelta.changedTopics().forEach((topicId, topicDelta) -> { String topicName = topicDelta.name(); + // Remove topic hash from the cache to recalculate it. + topicHashCache.remove(topicName); allGroupIds.addAll(groupsSubscribedToTopic(topicName)); }); topicsDelta.deletedTopicIds().forEach(topicId -> { TopicImage topicImage = delta.image().topics().getTopic(topicId); - allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name())); + String topicName = topicImage.name(); + topicHashCache.remove(topicName); + allGroupIds.addAll(groupsSubscribedToTopic(topicName)); }); allGroupIds.forEach(groupId -> { Group group = groups.get(groupId); @@ -8375,6 +8393,10 @@ public class GroupMetadataManager { return Collections.unmodifiableSet(this.groups.keySet()); } + // Visible for testing + Map topicHashCache() { + return Collections.unmodifiableMap(this.topicHashCache); + } /** * Get the session timeout of the provided consumer group. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java index f7441fce0b1..d614123d2a7 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java @@ -348,7 +348,7 @@ public class Utils { * @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash. * @return The hash of the group. */ - static long computeGroupHash(Map topicHashes) { + public static long computeGroupHash(Map topicHashes) { if (topicHashes.isEmpty()) { return 0; } @@ -386,7 +386,7 @@ public class Utils { * @param metadataImage The cluster image. * @return The hash of the topic. */ - static long computeTopicHash(String topicName, MetadataImage metadataImage) { + public static long computeTopicHash(String topicName, MetadataImage metadataImage) { TopicImage topicImage = metadataImage.topics().getTopic(topicName); if (topicImage == null) { return 0; diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java index 0c6ff25cb13..7439216abfe 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java @@ -20,13 +20,16 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.Utils; import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import org.apache.kafka.image.ClusterImage; +import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.TopicImage; import org.apache.kafka.image.TopicsImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineInteger; +import org.apache.kafka.timeline.TimelineLong; import org.apache.kafka.timeline.TimelineObject; import java.util.Collections; @@ -88,6 +91,11 @@ public abstract class ModernGroup implements Group */ protected final TimelineHashMap subscribedTopicMetadata; + /** + * The metadata hash which is computed based on the all subscribed topics. + */ + protected final TimelineLong metadataHash; + /** * The group's subscription type. * This value is set to Homogeneous by default. @@ -134,6 +142,7 @@ public abstract class ModernGroup implements Group this.members = new TimelineHashMap<>(snapshotRegistry, 0); this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0); this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0); + this.metadataHash = new TimelineLong(snapshotRegistry); this.subscriptionType = new TimelineObject<>(snapshotRegistry, HOMOGENEOUS); this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry); this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0); @@ -355,6 +364,13 @@ public abstract class ModernGroup implements Group return Collections.unmodifiableMap(subscribedTopicMetadata); } + /** + * @return The metadata hash. + */ + public long metadataHash() { + return metadataHash.get(); + } + /** * Updates the subscription metadata. This replaces the previous one. * @@ -367,6 +383,15 @@ public abstract class ModernGroup implements Group this.subscribedTopicMetadata.putAll(subscriptionMetadata); } + /** + * Updates the metadata hash. + * + * @param metadataHash The new metadata hash. + */ + public void setMetadataHash(long metadataHash) { + this.metadataHash.set(metadataHash); + } + /** * Computes the subscription metadata based on the current subscription info. * @@ -398,6 +423,24 @@ public abstract class ModernGroup implements Group return Collections.unmodifiableMap(newSubscriptionMetadata); } + public static long computeMetadataHash( + Map subscribedTopicNames, + Map topicHashCache, + MetadataImage metadataImage + ) { + Map topicHash = new HashMap<>(subscribedTopicNames.size()); + subscribedTopicNames.keySet().forEach(topicName -> { + TopicImage topicImage = metadataImage.topics().getTopic(topicName); + if (topicImage != null) { + topicHash.put( + topicName, + topicHashCache.computeIfAbsent(topicName, k -> Utils.computeTopicHash(topicName, metadataImage)) + ); + } + }); + return Utils.computeGroupHash(topicHash); + } + /** * Updates the metadata refresh deadline. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index 2be03621213..880cd49769c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -43,7 +43,7 @@ import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.ModernGroup; import org.apache.kafka.coordinator.group.modern.ModernGroupMember; import org.apache.kafka.coordinator.group.modern.SubscriptionCount; -import org.apache.kafka.image.ClusterImage; +import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.TopicsImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; @@ -152,6 +152,8 @@ public class ConsumerGroup extends ModernGroup { */ private final TimelineHashMap resolvedRegularExpressions; + private final TimelineObject hasSubscriptionMetadataRecord; + public ConsumerGroup( SnapshotRegistry snapshotRegistry, String groupId, @@ -167,6 +169,7 @@ public class ConsumerGroup extends ModernGroup { this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0); this.subscribedRegularExpressions = new TimelineHashMap<>(snapshotRegistry, 0); this.resolvedRegularExpressions = new TimelineHashMap<>(snapshotRegistry, 0); + this.hasSubscriptionMetadataRecord = new TimelineObject<>(snapshotRegistry, false); } /** @@ -1130,8 +1133,8 @@ public class ConsumerGroup extends ModernGroup { * @param snapshotRegistry The SnapshotRegistry. * @param metrics The GroupCoordinatorMetricsShard. * @param classicGroup The converted classic group. - * @param topicsImage The current metadata for all available topics. - * @param clusterImage The current metadata for the Kafka cluster. + * @param topicHashCache The cache for topic hashes. + * @param metadataImage The current metadata image for the Kafka cluster. * @return The created ConsumerGroup. * * @throws SchemaException if any member's subscription or assignment cannot be deserialized. @@ -1141,8 +1144,8 @@ public class ConsumerGroup extends ModernGroup { SnapshotRegistry snapshotRegistry, GroupCoordinatorMetricsShard metrics, ClassicGroup classicGroup, - TopicsImage topicsImage, - ClusterImage clusterImage + Map topicHashCache, + MetadataImage metadataImage ) { String groupId = classicGroup.groupId(); ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics); @@ -1162,7 +1165,7 @@ public class ConsumerGroup extends ModernGroup { if (assignment.userData() != null && assignment.userData().hasRemaining()) { throw new UnsupportedVersionException("userData from a custom assignor would be lost"); } - assignedPartitions = toTopicPartitionMap(assignment, topicsImage); + assignedPartitions = toTopicPartitionMap(assignment, metadataImage.topics()); } // Every member is guaranteed to have metadata set when it joins, @@ -1198,10 +1201,10 @@ public class ConsumerGroup extends ModernGroup { consumerGroup.updateMember(newMember); }); - consumerGroup.setSubscriptionMetadata(consumerGroup.computeSubscriptionMetadata( + consumerGroup.setMetadataHash(ModernGroup.computeMetadataHash( consumerGroup.subscribedTopicNames(), - topicsImage, - clusterImage + topicHashCache, + metadataImage )); return consumerGroup; @@ -1219,9 +1222,7 @@ public class ConsumerGroup extends ModernGroup { records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember)) ); - records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId(), subscriptionMetadata())); - - records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch(), 0)); + records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch(), metadataHash())); members().forEach((consumerGroupMemberId, consumerGroupMember) -> records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord( @@ -1309,4 +1310,12 @@ public class ConsumerGroup extends ModernGroup { } return false; } + + public void setHasSubscriptionMetadataRecord(boolean hasSubscriptionMetadataRecord) { + this.hasSubscriptionMetadataRecord.set(hasSubscriptionMetadataRecord); + } + + public boolean hasSubscriptionMetadataRecord() { + return hasSubscriptionMetadataRecord.get(); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java index 5ef33035e57..38ee8501ee2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java @@ -33,7 +33,6 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; -import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey; @@ -49,7 +48,6 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue; import org.apache.kafka.coordinator.group.modern.MemberState; -import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression; import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue; @@ -62,7 +60,6 @@ import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -72,7 +69,6 @@ import java.util.OptionalLong; import java.util.Set; import java.util.stream.Stream; -import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedTopicAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; @@ -82,7 +78,6 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord; -import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord; import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord; @@ -155,47 +150,6 @@ public class GroupCoordinatorRecordHelpersTest { )); } - @Test - public void testNewConsumerGroupSubscriptionMetadataRecord() { - Uuid fooTopicId = Uuid.randomUuid(); - Uuid barTopicId = Uuid.randomUuid(); - Map subscriptionMetadata = new LinkedHashMap<>(); - - subscriptionMetadata.put("foo", new TopicMetadata( - fooTopicId, - "foo", - 10 - )); - subscriptionMetadata.put("bar", new TopicMetadata( - barTopicId, - "bar", - 20 - )); - - CoordinatorRecord expectedRecord = CoordinatorRecord.record( - new ConsumerGroupPartitionMetadataKey() - .setGroupId("group-id"), - new ApiMessageAndVersion( - new ConsumerGroupPartitionMetadataValue() - .setTopics(Arrays.asList( - new ConsumerGroupPartitionMetadataValue.TopicMetadata() - .setTopicId(fooTopicId) - .setTopicName("foo") - .setNumPartitions(10), - new ConsumerGroupPartitionMetadataValue.TopicMetadata() - .setTopicId(barTopicId) - .setTopicName("bar") - .setNumPartitions(20))), - (short) 0 - ) - ); - - assertRecordEquals(expectedRecord, newConsumerGroupSubscriptionMetadataRecord( - "group-id", - subscriptionMetadata - )); - } - @Test public void testNewConsumerGroupSubscriptionMetadataTombstoneRecord() { CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone( @@ -208,47 +162,6 @@ public class GroupCoordinatorRecordHelpersTest { )); } - @Test - public void testEmptyPartitionMetadataWhenRacksUnavailableGroupSubscriptionMetadataRecord() { - Uuid fooTopicId = Uuid.randomUuid(); - Uuid barTopicId = Uuid.randomUuid(); - Map subscriptionMetadata = new LinkedHashMap<>(); - - subscriptionMetadata.put("foo", new TopicMetadata( - fooTopicId, - "foo", - 10 - )); - subscriptionMetadata.put("bar", new TopicMetadata( - barTopicId, - "bar", - 20 - )); - - CoordinatorRecord expectedRecord = CoordinatorRecord.record( - new ConsumerGroupPartitionMetadataKey() - .setGroupId("group-id"), - new ApiMessageAndVersion( - new ConsumerGroupPartitionMetadataValue() - .setTopics(Arrays.asList( - new ConsumerGroupPartitionMetadataValue.TopicMetadata() - .setTopicId(fooTopicId) - .setTopicName("foo") - .setNumPartitions(10), - new ConsumerGroupPartitionMetadataValue.TopicMetadata() - .setTopicId(barTopicId) - .setTopicName("bar") - .setNumPartitions(20))), - (short) 0 - ) - ); - - assertRecordEquals(expectedRecord, newConsumerGroupSubscriptionMetadataRecord( - "group-id", - subscriptionMetadata - )); - } - @Test public void testNewConsumerGroupEpochRecord() { CoordinatorRecord expectedRecord = CoordinatorRecord.record( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index bf6884a5336..fc95e78e9eb 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -91,6 +91,8 @@ import org.apache.kafka.coordinator.group.classic.ClassicGroup; import org.apache.kafka.coordinator.group.classic.ClassicGroupMember; import org.apache.kafka.coordinator.group.classic.ClassicGroupState; import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue; @@ -190,6 +192,8 @@ import static org.apache.kafka.coordinator.group.GroupMetadataManager.groupSessi import static org.apache.kafka.coordinator.group.GroupMetadataManagerTestContext.DEFAULT_CLIENT_ADDRESS; import static org.apache.kafka.coordinator.group.GroupMetadataManagerTestContext.DEFAULT_CLIENT_ID; import static org.apache.kafka.coordinator.group.GroupMetadataManagerTestContext.DEFAULT_PROCESS_ID; +import static org.apache.kafka.coordinator.group.Utils.computeGroupHash; +import static org.apache.kafka.coordinator.group.Utils.computeTopicHash; import static org.apache.kafka.coordinator.group.classic.ClassicGroupMember.EMPTY_ASSIGNMENT; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.COMPLETING_REBALANCE; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.DEAD; @@ -462,14 +466,16 @@ public class GroupMetadataManagerTest { Uuid barTopicId = Uuid.randomUuid(); String barTopicName = "bar"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .build(); assignor.prepareGroupAssignment(new GroupAssignment( @@ -525,11 +531,10 @@ public class GroupMetadataManagerTest { List expectedRecords = List.of( GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - )), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(barTopicId, 0, 1, 2) @@ -541,6 +546,110 @@ public class GroupMetadataManagerTest { assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testTopicHashIsRemoveFromCacheIfNoGroupSubscribesIt() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build(); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Map.of(memberId, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setServerAssignor("range") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setTopicPartitions(List.of())); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + ))), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + ))), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + assertEquals(Map.of(fooTopicName, fooTopicHash), context.groupMetadataManager.topicHashCache()); + + // Use LEAVE_GROUP_MEMBER_EPOCH to leave group, so there is no group subscribes to foo. + result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(-1) + .setHeartbeatIntervalMs(0), + result.response() + ); + + expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId), + GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0) + ); + assertRecordsEquals(expectedRecords, result.records()); + assertEquals(Map.of(), context.groupMetadataManager.topicHashCache()); + } + @Test public void testUpdatingSubscriptionTriggersNewTargetAssignment() { String groupId = "fooup"; @@ -552,14 +661,18 @@ public class GroupMetadataManagerTest { Uuid barTopicId = Uuid.randomUuid(); String barTopicName = "bar"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .setState(MemberState.STABLE) @@ -574,7 +687,8 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of(fooTopicName, fooTopicHash)))) .build(); assignor.prepareGroupAssignment(new GroupAssignment( @@ -623,11 +737,10 @@ public class GroupMetadataManagerTest { List expectedRecords = List.of( GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - )), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), mkTopicAssignment(barTopicId, 0, 1, 2) @@ -639,6 +752,280 @@ public class GroupMetadataManagerTest { assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testNewRacksDataInMetadataImageTriggersEpochBump() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage)) + ))) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Map.of(memberId, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + // Update metadata image with racks. + MetadataImage newMetadataImage = new MetadataImageBuilder(metadataImage) + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build(); + + context.groupMetadataManager.onNewMetadataImage( + newMetadataImage, + new MetadataDelta(newMetadataImage) + ); + // If a topic is updated, related topic hash is cleanup. + assertEquals(Map.of(), context.groupMetadataManager.topicHashCache()); + + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, newMetadataImage) + ))), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testRemoveTopicCleanupTopicHash() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, + computeTopicHash(fooTopicName, metadataImage)) + ))) + .build(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Map.of(memberId, new MemberAssignmentImpl(Map.of())) + )); + + // Remove foo topic from metadata image. + MetadataDelta delta = new MetadataDelta(metadataImage); + delta.replay(new RemoveTopicRecord().setTopicId(fooTopicId)); + MetadataImage newMetadataImage = delta.apply(MetadataProvenance.EMPTY); + + context.groupMetadataManager.onNewMetadataImage( + newMetadataImage, + new MetadataDelta(newMetadataImage) + ); + // If a topic is removed, related topic hash is cleanup. + assertEquals(Map.of(), context.groupMetadataManager.topicHashCache()); + + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(10) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setPartitionsPendingRevocation(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, Map.of()), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + + @Test + public void testSubscriptionUpgradeToMetadataHash() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build(); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .withAssignmentEpoch(10)) + .build(); + + ConsumerGroupPartitionMetadataValue consumerGroupPartitionMetadataValue = new ConsumerGroupPartitionMetadataValue(); + consumerGroupPartitionMetadataValue.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(fooTopicId) + .setTopicName(fooTopicName) + .setNumPartitions(6)); + context.replay(CoordinatorRecord.record( + new ConsumerGroupPartitionMetadataKey().setGroupId(groupId), + new ApiMessageAndVersion(consumerGroupPartitionMetadataValue, (short) 0) + )); + context.commit(); + + assignor.prepareGroupAssignment(new GroupAssignment( + Map.of(memberId, new MemberAssignmentImpl(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + ))), + GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId), + GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), + GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + } + @Test public void testNewJoiningMemberTriggersNewTargetAssignment() { String groupId = "fooup"; @@ -652,14 +1039,20 @@ public class GroupMetadataManagerTest { Uuid barTopicId = Uuid.randomUuid(); String barTopicName = "bar"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + long groupMetadataHash = computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -693,7 +1086,8 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(groupMetadataHash)) .build(); assignor.prepareGroupAssignment(new GroupAssignment(Map.of( @@ -745,7 +1139,7 @@ public class GroupMetadataManagerTest { assertUnorderedRecordsEquals( List.of( List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, groupMetadataHash)), List.of( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), @@ -782,16 +1176,20 @@ public class GroupMetadataManagerTest { String zarTopicName = "zar"; MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build(); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + long zarTopicHash = computeTopicHash(zarTopicName, metadataImage); // Consumer group with two members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addTopic(zarTopicId, zarTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -824,7 +1222,12 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash, + zarTopicName, zarTopicHash + )))) .build(); // Member 2 leaves the consumer group. @@ -848,12 +1251,11 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2), - // Subscription metadata is recomputed because zar is no longer there. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - )), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0) + // Group metadata hash is recomputed because zar is no longer there. + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + ))) ); assertRecordsEquals(expectedRecords, result.records()); @@ -874,14 +1276,20 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + long groupMetadataHash = computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )); + // Consumer group with two static members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -916,7 +1324,8 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(groupMetadataHash)) .build(); assignor.prepareGroupAssignment(new GroupAssignment(Map.of( @@ -970,7 +1379,7 @@ public class GroupMetadataManagerTest { assertUnorderedRecordsEquals( List.of( List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, groupMetadataHash)), List.of( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1), @@ -1035,14 +1444,16 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 2))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + // Consumer group with two static members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) @@ -1053,10 +1464,10 @@ public class GroupMetadataManagerTest { mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) .withAssignmentEpoch(10) - .withSubscriptionMetadata(Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - ))) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )))) .build(); // Member 2 leaves the consumer group. @@ -1208,14 +1619,18 @@ public class GroupMetadataManagerTest { mkTopicAssignment(fooTopicId, 3, 4, 5))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + // Consumer group with two static members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) @@ -1224,9 +1639,9 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5))) .withAssignmentEpoch(10) - .withSubscriptionMetadata( - Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)) - )) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )))) .build(); assignor.prepareGroupAssignment(new GroupAssignment(Map.of( @@ -1336,11 +1751,10 @@ public class GroupMetadataManagerTest { // As the new member as a different subscribed topic set, a rebalance is triggered. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedRejoinedMember), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - )), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, member2RejoinId, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 0, 1, 2) @@ -1396,14 +1810,16 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 2))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + // Consumer group with two static members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) @@ -1413,7 +1829,11 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )))) .build(); // Member 2 leaves the consumer group. @@ -1460,15 +1880,20 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build(); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + long zarTopicHash = computeTopicHash(zarTopicName, metadataImage); + // Consumer group with two static members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addTopic(zarTopicId, zarTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -1503,7 +1928,12 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash, + zarTopicName, zarTopicHash + )))) .build(); // Member 2 leaves the consumer group. @@ -1528,12 +1958,11 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2), - // Subscription metadata is recomputed because zar is no longer there. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - )), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0) + // Group metadata hash is recomputed because zar is no longer there. + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + ))) ); assertRecordsEquals(expectedRecords, result.records()); @@ -1551,13 +1980,15 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build(); + // Consumer group with one static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -1573,7 +2004,11 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, + computeTopicHash(fooTopicName, metadataImage)) + ))) .build(); // Member 2 joins the consumer group with an in-use instance id. @@ -1601,12 +2036,14 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(); + // Consumer group with one static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -1622,7 +2059,11 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, + computeTopicHash(fooTopicName, metadataImage)) + ))) .build(); // Member 2 joins the consumer group with a non-zero epoch @@ -1649,12 +2090,14 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(); + // Consumer group with one static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -1670,7 +2113,11 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, + computeTopicHash(fooTopicName, metadataImage)) + ))) .build(); assertThrows(FencedInstanceIdException.class, () -> context.consumerGroupHeartbeat( @@ -1782,12 +2229,14 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(); + // Consumer group with one static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -1803,7 +2252,11 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, + computeTopicHash(fooTopicName, metadataImage)) + ))) .build(); assertThrows(UnknownMemberIdException.class, () -> context.consumerGroupHeartbeat( @@ -1828,12 +2281,14 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(); + // Consumer group with one static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -1849,7 +2304,11 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, + computeTopicHash(fooTopicName, metadataImage)) + ))) .build(); assertThrows(FencedInstanceIdException.class, () -> context.consumerGroupHeartbeat( @@ -1991,15 +2450,17 @@ public class GroupMetadataManagerTest { Uuid barTopicId = Uuid.randomUuid(); String barTopicName = "bar"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + // Create a context with one consumer group containing two members. MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -2033,7 +2494,11 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )))) .build(); // Prepare new assignment for the group. @@ -2422,14 +2887,16 @@ public class GroupMetadataManagerTest { Uuid barTopicId = Uuid.randomUuid(); String barTopicName = "bar"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(); + // Create a context with one consumer group containing two members. MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .build()) + .withMetadataImage(metadataImage) .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG, 2) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -2464,7 +2931,11 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )))) .build(); assertThrows(GroupMaxSizeReachedException.class, () -> @@ -2486,6 +2957,10 @@ public class GroupMetadataManagerTest { Uuid fooTopicId = Uuid.randomUuid(); String fooTopicName = "foo"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) @@ -2498,7 +2973,9 @@ public class GroupMetadataManagerTest { .setState(MemberState.STABLE) .setSubscribedTopicNames(List.of(fooTopicName)) .build())); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + )))); assertEquals(ConsumerGroup.ConsumerGroupState.ASSIGNING, context.consumerGroupState(groupId)); @@ -2574,14 +3051,16 @@ public class GroupMetadataManagerTest { Uuid fooTopicId = Uuid.randomUuid(); String fooTopicName = "foo"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build(); + // Create a context with one consumer group containing one member. MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .setState(MemberState.STABLE) @@ -2598,11 +3077,14 @@ public class GroupMetadataManagerTest { .withAssignment(memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2))) .withAssignmentEpoch(10) - .withSubscriptionMetadata( - // foo only has 3 partitions stored in the metadata but foo has - // 6 partitions the metadata image. - Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)) - )) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, new MetadataImageBuilder() + // foo only has 3 partitions stored in the metadata but foo has + // 6 partitions the metadata image. + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .build()) + )))) .build(); // The metadata refresh flag should be true. @@ -2652,10 +3134,9 @@ public class GroupMetadataManagerTest { .build(); List expectedRecords = List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, - Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)) - ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) )), @@ -2680,14 +3161,16 @@ public class GroupMetadataManagerTest { Uuid fooTopicId = Uuid.randomUuid(); String fooTopicName = "foo"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addRacks() + .build(); + // Create a context with one consumer group containing one member. MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .setState(MemberState.STABLE) @@ -2704,11 +3187,14 @@ public class GroupMetadataManagerTest { .withAssignment(memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2))) .withAssignmentEpoch(10) - .withSubscriptionMetadata( - // foo only has 3 partitions stored in the metadata but foo has - // 6 partitions the metadata image. - Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)) - )) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, new MetadataImageBuilder() + // foo only has 3 partitions stored in the metadata but foo has + // 6 partitions the metadata image. + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .build()) + )))) .build(); // The metadata refresh flag should be true. @@ -2776,10 +3262,9 @@ public class GroupMetadataManagerTest { .build(); List expectedRecords = List.of( - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, - Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)) - ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) )), @@ -3124,7 +3609,6 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0) ) ) @@ -3144,10 +3628,12 @@ public class GroupMetadataManagerTest { String fooTopicName = "foo"; String memberId = "foo-1"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder("foo-1") .setState(MemberState.STABLE) @@ -3162,7 +3648,10 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage)) + ))) .build(); // Let's assume that all the records have been replayed and now @@ -3185,7 +3674,6 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0) ) ) @@ -3387,7 +3875,6 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0) ) ) @@ -3555,13 +4042,15 @@ public class GroupMetadataManagerTest { Uuid fooTopicId = Uuid.randomUuid(); String fooTopicName = "foo"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .build(); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .build(); assignor.prepareGroupAssignment(new GroupAssignment( @@ -3664,7 +4153,9 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 0) + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + ))) ) ) )), @@ -3683,11 +4174,12 @@ public class GroupMetadataManagerTest { Uuid barTopicId = Uuid.randomUuid(); String barTopicName = "bar"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder("foo", 10) .withMember(new ConsumerGroupMember.Builder("foo-1") .setState(MemberState.UNREVOKED_PARTITIONS) @@ -3713,7 +4205,10 @@ public class GroupMetadataManagerTest { .build()) .withAssignment("foo-1", mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + )))) .build(); // Let's assume that all the records have been replayed and now @@ -10239,11 +10734,10 @@ public class GroupMetadataManagerTest { // Create the new consumer group with member 1. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1), @@ -10252,7 +10746,10 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), // Newly joining member 2 bumps the group epoch. A new target assignment is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), @@ -10452,12 +10949,10 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()), @@ -10470,7 +10965,10 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3), // Newly joining member 3 bumps the group epoch. A new target assignment is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), @@ -10727,8 +11225,9 @@ public class GroupMetadataManagerTest { // Create the new consumer group with the static member. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedClassicMember), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, group.generationId(), 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, group.generationId(), computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, expectedClassicMember.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, group.generationId()), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedClassicMember), @@ -10752,6 +11251,10 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedFinalConsumerMember) ); + assertEquals(expectedRecords.size(), result.records().size()); + assertRecordsEquals(expectedRecords.subList(0, 5), result.records().subList(0, 5)); + assertRecordsEquals(expectedRecords.subList(5, 10), result.records().subList(5, 10)); + assertRecordsEquals(expectedRecords, result.records()); context.assertSessionTimeout(groupId, newMemberId, 45000); } @@ -10847,11 +11350,10 @@ public class GroupMetadataManagerTest { // Create the new consumer group with member 1. GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1), GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1), @@ -10860,7 +11362,10 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), // Newly joining member 2 bumps the group epoch. A new target assignment is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2), @@ -10937,16 +11442,18 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 2))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + // Consumer group with two static members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.UPGRADE.toString()) .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new NoOpPartitionAssignor())) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) @@ -10956,16 +11463,13 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )))) .build(); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - ))); - - context.commit(); - // The member 1 with the classic protocol upgrades, heartbeating with new protocol. CoordinatorResult result = context.consumerGroupHeartbeat( new ConsumerGroupHeartbeatRequestData() @@ -11230,12 +11734,10 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()), @@ -11248,7 +11750,10 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3), // Newly joining member 3 bumps the group epoch. A new target assignment is computed. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId3, assignor.targetPartitions(memberId3)), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 2), @@ -11432,16 +11937,18 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 2))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DOWNGRADE.toString()) .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) @@ -11451,15 +11958,13 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )))) .build(); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - ))); - - context.commit(); ConsumerGroup consumerGroup = context.groupMetadataManager.consumerGroup(groupId); // Member 2 leaves the consumer group, triggering the downgrade. @@ -11612,16 +12117,18 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 2))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addRacks() + .build(); + // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DOWNGRADE.toString()) .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) @@ -11631,16 +12138,13 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )))) .build(); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - ))); - - context.commit(); - // Session timer is scheduled on the heartbeat. context.consumerGroupHeartbeat( new ConsumerGroupHeartbeatRequestData() @@ -11793,17 +12297,19 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 2))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build(); + // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DOWNGRADE.toString()) .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .addTopic(zarTopicId, zarTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) @@ -11813,17 +12319,14 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5), mkTopicAssignment(barTopicId, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage), + zarTopicName, computeTopicHash(zarTopicName, metadataImage) + )))) .build(); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3), - zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1) - ))); - - context.commit(); - // Prepare the new assignment. assignor.prepareGroupAssignment(new GroupAssignment(Map.of( memberId1, new MemberAssignmentImpl(mkAssignment( @@ -11995,16 +12498,18 @@ public class GroupMetadataManagerTest { mkTopicAssignment(fooTopicId, 3, 4, 5))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 2) + .addRacks() + .build(); + // Consumer group with two members. // Member 1 uses the classic protocol and static member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DOWNGRADE.toString()) .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 2) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(oldMember2) @@ -12013,15 +12518,14 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 0, 1))) .withAssignment(oldMemberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )))) .build(); context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 2) - ))); - context.commit(); // A new member using classic protocol with the same instance id joins, scheduling the downgrade. JoinGroupRequestData joinRequest = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() @@ -12233,17 +12737,19 @@ public class GroupMetadataManagerTest { for (short version = ConsumerProtocolSubscription.LOWEST_SUPPORTED_VERSION; version <= ConsumerProtocolSubscription.HIGHEST_SUPPORTED_VERSION; version++) { String memberId = Uuid.randomUuid().toString(); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build(); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 2) - .addTopic(barTopicId, barTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) - .withSubscriptionMetadata( - Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)) - ) .withMember(new ConsumerGroupMember.Builder(memberId) .setState(MemberState.STABLE) .setMemberEpoch(10) @@ -12253,7 +12759,10 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )))) .build(); JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() @@ -12322,11 +12831,10 @@ public class GroupMetadataManagerTest { assertUnorderedRecordsEquals( List.of( List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - ))), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + )))), List.of( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, assignor.targetPartitions(memberId)), @@ -12373,16 +12881,14 @@ public class GroupMetadataManagerTest { )) ))); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build(); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 2) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) - .withSubscriptionMetadata( - Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)) - ) .withMember(new ConsumerGroupMember.Builder(memberId) .setState(MemberState.STABLE) .setMemberEpoch(10) @@ -12392,7 +12898,10 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + )))) .build(); context.commit(); @@ -12426,17 +12935,18 @@ public class GroupMetadataManagerTest { String memberId = Uuid.randomUuid().toString(); String instanceId = "instance-id"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build(); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new NoOpPartitionAssignor())) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 2) - .addTopic(barTopicId, barTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) - .withSubscriptionMetadata( - Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)) - ) .withMember(new ConsumerGroupMember.Builder(memberId) .setState(MemberState.STABLE) .setMemberEpoch(10) @@ -12446,7 +12956,8 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of(fooTopicName, fooTopicHash)))) .build(); JoinGroupRequestData request = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder() @@ -12483,11 +12994,10 @@ public class GroupMetadataManagerTest { List expectedRecords = List.of( GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0), + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + ))), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), @@ -12516,19 +13026,18 @@ public class GroupMetadataManagerTest { Uuid fooTopicId = Uuid.randomUuid(); String fooTopicName = "foo"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addRacks() + .build(); + String memberId = Uuid.randomUuid().toString(); String instanceId = "instance-id"; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.toString()) .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new NoOpPartitionAssignor())) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 2) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) - .withSubscriptionMetadata( - Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)) - ) .withMember(new ConsumerGroupMember.Builder(memberId) .setInstanceId(instanceId) .setState(MemberState.STABLE) @@ -12541,7 +13050,10 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + )))) .build(); context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); @@ -12636,21 +13148,23 @@ public class GroupMetadataManagerTest { String memberId2 = Uuid.randomUuid().toString(); String instanceId = "instance-id"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build(); + long groupMetadataHash = computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage), + zarTopicName, computeTopicHash(zarTopicName, metadataImage) + )); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 2) - .addTopic(barTopicId, barTopicName, 1) - .addTopic(zarTopicId, zarTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) - .withSubscriptionMetadata(Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1), - zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1) - )) .withMember(new ConsumerGroupMember.Builder(memberId1) .setInstanceId(instanceId) .setState(MemberState.STABLE) @@ -12690,7 +13204,8 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 0))) .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 1))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(groupMetadataHash)) .build(); ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); @@ -12743,7 +13258,7 @@ public class GroupMetadataManagerTest { assertUnorderedRecordsEquals( List.of( List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, groupMetadataHash)), List.of( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0), @@ -12862,20 +13377,21 @@ public class GroupMetadataManagerTest { String memberId1 = Uuid.randomUuid().toString(); String memberId2 = Uuid.randomUuid().toString(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build(); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + long zarTopicHash = computeTopicHash(zarTopicName, metadataImage); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 2) - .addTopic(barTopicId, barTopicName, 1) - .addTopic(zarTopicId, zarTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) - .withSubscriptionMetadata(Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) .setMemberEpoch(10) @@ -12914,7 +13430,11 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 0))) .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 1))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + )))) .build(); ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); @@ -12969,12 +13489,11 @@ public class GroupMetadataManagerTest { assertUnorderedRecordsEquals( List.of( List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1), - zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1) - ))), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash, + zarTopicName, zarTopicHash + )))), List.of( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( @@ -13097,20 +13616,21 @@ public class GroupMetadataManagerTest { String memberId1 = Uuid.randomUuid().toString(); String memberId2 = Uuid.randomUuid().toString(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addTopic(zarTopicId, zarTopicName, 1) + .addRacks() + .build(); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + long zarTopicHash = computeTopicHash(zarTopicName, metadataImage); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 2) - .addTopic(barTopicId, barTopicName, 1) - .addTopic(zarTopicId, zarTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) - .withSubscriptionMetadata(Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) .setMemberEpoch(10) @@ -13149,7 +13669,11 @@ public class GroupMetadataManagerTest { mkTopicAssignment(barTopicId, 0))) .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 1))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + )))) .build(); ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId); group.setMetadataRefreshDeadline(Long.MAX_VALUE, 11); @@ -13204,12 +13728,11 @@ public class GroupMetadataManagerTest { assertUnorderedRecordsEquals( List.of( List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1)), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1), - zarTopicName, new TopicMetadata(zarTopicId, zarTopicName, 1) - ))), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)), + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash, + zarTopicName, zarTopicHash + )))), List.of( GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( @@ -14169,16 +14692,18 @@ public class GroupMetadataManagerTest { .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 0))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build(); + // Consumer group with three members. // Dynamic member 1 uses the classic protocol. // Static member 2 uses the classic protocol. // Static member 3 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 2) - .addTopic(barTopicId, barTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) @@ -14186,13 +14711,13 @@ public class GroupMetadataManagerTest { .withAssignment(memberId1, mkAssignment(mkTopicAssignment(fooTopicId, 0))) .withAssignment(memberId2, mkAssignment(mkTopicAssignment(fooTopicId, 1))) .withAssignment(memberId3, mkAssignment(mkTopicAssignment(barTopicId, 0))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )))) .build(); context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - ))); // Member 1 joins to schedule the sync timeout and the heartbeat timeout. context.sendClassicGroupJoin( @@ -14291,8 +14816,6 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId3), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId3) ), - // Update subscription metadata. - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of())), // Bump the group epoch. List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)) ); @@ -14353,26 +14876,30 @@ public class GroupMetadataManagerTest { .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 0))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 1) + .addRacks() + .build(); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 2) - .addTopic(barTopicId, barTopicName, 1) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) .withAssignment(memberId1, mkAssignment(mkTopicAssignment(fooTopicId, 0))) .withAssignment(memberId2, mkAssignment(mkTopicAssignment(barTopicId, 0))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + )))) .build(); context.groupMetadataManager.consumerGroup(groupId).setMetadataRefreshDeadline(Long.MAX_VALUE, 10); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - ))); // Member 1 leaves the group. CoordinatorResult leaveResult = context.sendClassicGroupLeave( @@ -14399,12 +14926,10 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1), - // Update the subscription metadata. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, - Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2)) - ), // Bump the group epoch. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0) + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + ))) ); assertEquals(expectedRecords, leaveResult.records()); } @@ -14538,6 +15063,12 @@ public class GroupMetadataManagerTest { .setAssignedPartitions(mkAssignment(mkTopicAssignment(barTopicId, 1))) .build(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .addTopic(barTopicId, barTopicName, 2) + .addRacks() + .build(); + // Consumer group with four members. // Dynamic member 1 uses the classic protocol. // Static member 2 uses the classic protocol. @@ -14546,11 +15077,7 @@ public class GroupMetadataManagerTest { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DOWNGRADE.toString()) .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 2) - .addTopic(barTopicId, barTopicName, 2) - .addRacks() - .build()) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) @@ -14560,15 +15087,13 @@ public class GroupMetadataManagerTest { .withAssignment(memberId2, mkAssignment(mkTopicAssignment(fooTopicId, 1))) .withAssignment(memberId3, mkAssignment(mkTopicAssignment(barTopicId, 0))) .withAssignment(memberId4, mkAssignment(mkTopicAssignment(barTopicId, 1))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + )))) .build(); - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 2) - ))); - - context.commit(); ConsumerGroup consumerGroup = context.groupMetadataManager.consumerGroup(groupId); // Member 2, member 3 and member 4 leave the group, triggering the downgrade. @@ -18369,17 +18894,21 @@ public class GroupMetadataManagerTest { @Test public void testReplayConsumerGroupPartitionMetadata() { + String groupId = "foo"; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .build(); - Map metadata = Map.of( - "bar", - new TopicMetadata(Uuid.randomUuid(), "bar", 10) - ); - // The group is created if it does not exist. - context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord("foo", metadata)); - assertEquals(metadata, context.groupMetadataManager.consumerGroup("foo").subscriptionMetadata()); + ConsumerGroupPartitionMetadataValue consumerGroupPartitionMetadataValue = new ConsumerGroupPartitionMetadataValue(); + consumerGroupPartitionMetadataValue.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata() + .setTopicId(Uuid.randomUuid()) + .setTopicName("bar") + .setNumPartitions(10)); + context.replay(CoordinatorRecord.record( + new ConsumerGroupPartitionMetadataKey().setGroupId(groupId), + new ApiMessageAndVersion(consumerGroupPartitionMetadataValue, (short) 0) + )); + assertTrue(context.groupMetadataManager.consumerGroup(groupId).hasSubscriptionMetadataRecord()); } @Test @@ -18393,6 +18922,18 @@ public class GroupMetadataManagerTest { assertThrows(GroupIdNotFoundException.class, () -> context.groupMetadataManager.consumerGroup("foo")); } + @Test + public void testReplayConsumerGroupPartitionMetadataTombstoneWithExistentGroup() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConsumerGroup(new ConsumerGroupBuilder("foo", 10)) + .build(); + + // The group exists. Replaying the ConsumerGroupPartitionMetadata tombstone + // doesn't set addSubscriptionMetadataTombstoneRecord flag. + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord("foo")); + assertFalse(context.groupMetadataManager.consumerGroup("foo").hasSubscriptionMetadataRecord()); + } + @Test public void testReplayConsumerGroupTargetAssignmentMember() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() @@ -19629,12 +20170,17 @@ public class GroupMetadataManagerTest { Uuid fooTopicId = Uuid.randomUuid(); String fooTopicName = "foo"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build(12345L); + long groupMetadataHash = computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage) + )); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .build(12345L)) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -19650,7 +20196,8 @@ public class GroupMetadataManagerTest { .build()) .withAssignment(memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(groupMetadataHash)) .build(); // Member 2 joins the consumer group with a new regular expression. @@ -19709,7 +20256,7 @@ public class GroupMetadataManagerTest { ) ), // The group epoch is bumped. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0) + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, groupMetadataHash) )) ) ), @@ -19727,13 +20274,15 @@ public class GroupMetadataManagerTest { Uuid barTopicId = Uuid.randomUuid(); String barTopicName = "bar"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(12345L); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .build(12345L)) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -19816,16 +20365,11 @@ public class GroupMetadataManagerTest { context.time.milliseconds() ) ), - // The updated subscription metadata. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord( - groupId, - Map.of( - "foo", new TopicMetadata(fooTopicId, fooTopicName, 6), - "bar", new TopicMetadata(barTopicId, barTopicName, 3) - ) - ), // The group epoch is bumped. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0) + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))) ), task.result.records() ); @@ -19844,12 +20388,14 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(12345L); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .build(12345L)) + .withMetadataImage(metadataImage) .build(); // Member 1 joins. @@ -19986,16 +20532,11 @@ public class GroupMetadataManagerTest { context.time.milliseconds() ) ), - // The updated subscription metadata. - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord( - groupId, - Map.of( - "foo", new TopicMetadata(fooTopicId, fooTopicName, 6), - "bar", new TopicMetadata(barTopicId, barTopicName, 3) - ) - ), // The group epoch is bumped. - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0) + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))) ), task.result.records() ); @@ -20021,6 +20562,8 @@ public class GroupMetadataManagerTest { .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .build(1L); + long fooTopicHash = computeTopicHash(fooTopicName, image); + long barTopicHash = computeTopicHash(barTopicName, image); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) @@ -20054,14 +20597,15 @@ public class GroupMetadataManagerTest { Set.of(fooTopicName), 0L, 0L)) .withResolvedRegularExpression("bar*", new ResolvedRegularExpression( Set.of(barTopicName), 0L, 0L)) - .withSubscriptionMetadata(Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3))) .withAssignment(memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) .withAssignment(memberId2, mkAssignment( mkTopicAssignment(barTopicId, 0, 1, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + )))) .build(); // Update metadata image. @@ -20129,15 +20673,11 @@ public class GroupMetadataManagerTest { ) ) ), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord( - groupId, - Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3), - foooTopicName, new TopicMetadata(foooTopicId, foooTopicName, 1) - ) - )), - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)) + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash, + foooTopicName, computeTopicHash(foooTopicName, newImage) + )))) ), task.result.records() ); @@ -20154,15 +20694,19 @@ public class GroupMetadataManagerTest { String fooTopicName = "foo"; String barTopicName = "bar"; + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(12345L); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); Authorizer authorizer = mock(Authorizer.class); Plugin authorizerPlugin = Plugin.wrapInstance(authorizer, null, "authorizer.class.name"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .build(12345L)) + .withMetadataImage(metadataImage) .withAuthorizerPlugin(authorizerPlugin) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) @@ -20189,15 +20733,14 @@ public class GroupMetadataManagerTest { .setAssignedPartitions(mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5))) .build()) - .withSubscriptionMetadata(Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6))) .withAssignment(memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2))) .withAssignment(memberId2, mkAssignment( mkTopicAssignment(fooTopicId, 3, 4, 5))) .withResolvedRegularExpression("foo*", new ResolvedRegularExpression( Set.of(fooTopicName), 0L, 0L)) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of(fooTopicName, fooTopicHash)))) .build(); // sleep for more than REGEX_BATCH_REFRESH_INTERVAL_MS @@ -20274,7 +20817,9 @@ public class GroupMetadataManagerTest { context.time.milliseconds() ) ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0) + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + ))) )) ) ), @@ -20357,14 +20902,10 @@ public class GroupMetadataManagerTest { context.time.milliseconds() ) ), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord( - groupId, - Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - ) - ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0) + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + ))) ), context.processTasks().get(0).result.records() ); @@ -20384,12 +20925,16 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(1L); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .build(1L)) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -20419,14 +20964,15 @@ public class GroupMetadataManagerTest { Set.of(fooTopicName), 0L, 0L)) .withResolvedRegularExpression("bar*", new ResolvedRegularExpression( Set.of(barTopicName), 0L, 0L)) - .withSubscriptionMetadata(Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3))) .withAssignment(memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) .withAssignment(memberId2, mkAssignment( mkTopicAssignment(barTopicId, 0, 1, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + )))) .build(); // Setup the timers. @@ -20451,10 +20997,9 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1), GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*"), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, - Map.of(barTopicName, new TopicMetadata(barTopicId, barTopicName, 3)) - ), - GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0) + GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + barTopicName, barTopicHash + ))) ); assertRecordsEquals(expectedRecords, result.records()); @@ -20473,7 +21018,6 @@ public class GroupMetadataManagerTest { GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId2), GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "bar*"), - GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of()), GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 12, 0) ) ) @@ -20498,12 +21042,16 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); assignor.prepareGroupAssignment(new GroupAssignment(Map.of())); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .addTopic(barTopicId, barTopicName, 3) + .build(1L); + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + long barTopicHash = computeTopicHash(barTopicName, metadataImage); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(fooTopicId, fooTopicName, 6) - .addTopic(barTopicId, barTopicName, 3) - .build(1L)) + .withMetadataImage(metadataImage) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId1) .setState(MemberState.STABLE) @@ -20561,15 +21109,15 @@ public class GroupMetadataManagerTest { Set.of(fooTopicName), 0L, 0L)) .withResolvedRegularExpression("bar*", new ResolvedRegularExpression( Set.of(barTopicName), 0L, 0L)) - .withSubscriptionMetadata(Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - )) .withAssignment(memberId1, mkAssignment( mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) .withAssignment(memberId2, mkAssignment( mkTopicAssignment(barTopicId, 0, 1, 2))) - .withAssignmentEpoch(10)) + .withAssignmentEpoch(10) + .withMetadataHash(computeGroupHash(Map.of( + fooTopicName, fooTopicHash, + barTopicName, barTopicHash + )))) .build(); // Remove members. @@ -20623,12 +21171,10 @@ public class GroupMetadataManagerTest { ), // Remove regex. List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")), - // Updated subscription metadata. - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( - barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) - ))), // Bumped epoch. - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)) + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( + barTopicName, barTopicHash + )))) ), result.records() ); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index dc0670656d0..d1bbc012717 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -556,7 +556,7 @@ public class GroupMetadataManagerTestContext { groupConfigManager ); - consumerGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay)); + consumerGroupBuilders.forEach(builder -> builder.build().forEach(context::replay)); shareGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay)); streamsGroupBuilders.forEach(builder -> builder.build().forEach(context::replay)); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java index 7da6e5e28dd..e086f283aac 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupBuilder.java @@ -20,9 +20,6 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers; import org.apache.kafka.coordinator.group.modern.Assignment; -import org.apache.kafka.coordinator.group.modern.TopicMetadata; -import org.apache.kafka.image.TopicImage; -import org.apache.kafka.image.TopicsImage; import java.util.ArrayList; import java.util.HashMap; @@ -36,7 +33,7 @@ public class ConsumerGroupBuilder { private int assignmentEpoch; private final Map members = new HashMap<>(); private final Map assignments = new HashMap<>(); - private Map subscriptionMetadata; + private long metadataHash = 0L; private final Map resolvedRegularExpressions = new HashMap<>(); public ConsumerGroupBuilder(String groupId, int groupEpoch) { @@ -58,8 +55,8 @@ public class ConsumerGroupBuilder { return this; } - public ConsumerGroupBuilder withSubscriptionMetadata(Map subscriptionMetadata) { - this.subscriptionMetadata = subscriptionMetadata; + public ConsumerGroupBuilder withMetadataHash(long metadataHash) { + this.metadataHash = metadataHash; return this; } @@ -73,7 +70,7 @@ public class ConsumerGroupBuilder { return this; } - public List build(TopicsImage topicsImage) { + public List build() { List records = new ArrayList<>(); // Add subscription records for members. @@ -86,29 +83,8 @@ public class ConsumerGroupBuilder { records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(groupId, regex, resolvedRegularExpression)) ); - // Add subscription metadata. - if (subscriptionMetadata == null) { - subscriptionMetadata = new HashMap<>(); - members.forEach((memberId, member) -> - member.subscribedTopicNames().forEach(topicName -> { - TopicImage topicImage = topicsImage.getTopic(topicName); - if (topicImage != null) { - subscriptionMetadata.put(topicName, new TopicMetadata( - topicImage.id(), - topicImage.name(), - topicImage.partitions().size() - )); - } - }) - ); - } - - if (!subscriptionMetadata.isEmpty()) { - records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); - } - // Add group epoch record. - records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, groupEpoch, 0)); + records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, groupEpoch, metadataHash)); // Add target assignment records. assignments.forEach((memberId, assignment) -> diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index ccbed0cf9fd..9fd743ac449 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -46,6 +46,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.ModernGroup; import org.apache.kafka.coordinator.group.modern.SubscriptionCount; import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.image.MetadataImage; @@ -57,6 +58,7 @@ import org.junit.jupiter.params.provider.CsvSource; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -70,6 +72,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.Utils.computeGroupHash; +import static org.apache.kafka.coordinator.group.Utils.computeTopicHash; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE; @@ -1532,8 +1536,8 @@ public class ConsumerGroupTest { new SnapshotRegistry(logContext), mock(GroupCoordinatorMetricsShard.class), classicGroup, - metadataImage.topics(), - metadataImage.cluster() + new HashMap<>(), + metadataImage ); ConsumerGroup expectedConsumerGroup = new ConsumerGroup( @@ -1546,10 +1550,10 @@ public class ConsumerGroupTest { expectedConsumerGroup.updateTargetAssignment(memberId, new Assignment(mkAssignment( mkTopicAssignment(fooTopicId, 0) ))); - expectedConsumerGroup.setSubscriptionMetadata(Map.of( - fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1), - barTopicName, new TopicMetadata(barTopicId, barTopicName, 1) - )); + expectedConsumerGroup.setMetadataHash(computeGroupHash(Map.of( + fooTopicName, computeTopicHash(fooTopicName, metadataImage), + barTopicName, computeTopicHash(barTopicName, metadataImage) + ))); expectedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(memberId) .setMemberEpoch(classicGroup.generationId()) .setState(MemberState.STABLE) @@ -2262,4 +2266,105 @@ public class ConsumerGroupTest { ) ); } + + @Test + public void testComputeMetadataHash() { + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(Uuid.randomUuid(), "foo", 1) + .addTopic(Uuid.randomUuid(), "bar", 1) + .addRacks() + .build(); + Map cache = new HashMap<>(); + assertEquals( + computeGroupHash(Map.of( + "foo", computeTopicHash("foo", metadataImage), + "bar", computeTopicHash("bar", metadataImage) + )), + ModernGroup.computeMetadataHash( + Map.of( + "foo", new SubscriptionCount(1, 0), + "bar", new SubscriptionCount(1, 0) + ), + cache, + metadataImage + ) + ); + assertEquals( + Map.of( + "foo", computeTopicHash("foo", metadataImage), + "bar", computeTopicHash("bar", metadataImage) + ), + cache + ); + } + + @Test + public void testComputeMetadataHashUseCacheData() { + // Use hash map because topic hash cache cannot be immutable. + Map cache = new HashMap<>(); + cache.put("foo", 1234L); + cache.put("bar", 4321L); + + assertEquals( + computeGroupHash(cache), + ModernGroup.computeMetadataHash( + Map.of( + "foo", new SubscriptionCount(1, 0), + "bar", new SubscriptionCount(1, 0) + ), + cache, + new MetadataImageBuilder() + .addTopic(Uuid.randomUuid(), "foo", 1) + .addTopic(Uuid.randomUuid(), "bar", 1) + .addRacks() + .build() + ) + ); + assertEquals( + Map.of( + "foo", 1234L, + "bar", 4321L + ), + cache + ); + } + + @Test + public void testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() { + // Use hash map because topic hash cache cannot be immutable. + // The zar is not in metadata image, so it should not be used. + Map cache = new HashMap<>(); + cache.put("foo", 1234L); + cache.put("bar", 4321L); + cache.put("zar", 0L); + + assertEquals( + computeGroupHash(Map.of( + "foo", 1234L, + "bar", 4321L + )), + ModernGroup.computeMetadataHash( + Map.of( + "foo", new SubscriptionCount(1, 0), + "bar", new SubscriptionCount(1, 0) + ), + cache, + new MetadataImageBuilder() + .addTopic(Uuid.randomUuid(), "foo", 1) + .addTopic(Uuid.randomUuid(), "bar", 1) + .addRacks() + .build() + ) + ); + + // Although the zar is not in metadata image, it should not be removed from computeMetadataHash function. + assertEquals( + Map.of( + "foo", 1234L, + "bar", 4321L, + "zar", 0L + ), + cache + ); + } }