KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group (#19761)

* Add `topicHashCache` to `GroupMetadataManager`.
* Remove topic hash from cache if related topic image is updated.
* Ignore topic hash 0 when calculating group metadata hash.
* Add `metadataHash` to `ModernGroup`.
* Replace subscription metadata with metadata hash.
* If there is data in `ConsumerGroupPartitionMetadataValue`, set a flag
in group to add tombstone record in next group heartbeat.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
PoAn Yang 2025-05-28 07:56:46 -05:00 committed by GitHub
parent 543fb6c848
commit d6ee83a893
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 1276 additions and 693 deletions

View File

@ -28,7 +28,6 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
@ -127,36 +126,6 @@ public class GroupCoordinatorRecordHelpers {
);
}
/**
* Creates a ConsumerGroupPartitionMetadata record.
*
* @param groupId The consumer group id.
* @param newSubscriptionMetadata The subscription metadata.
* @return The record.
*/
public static CoordinatorRecord newConsumerGroupSubscriptionMetadataRecord(
String groupId,
Map<String, TopicMetadata> newSubscriptionMetadata
) {
ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue();
newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
value.topics().add(new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
)
);
return CoordinatorRecord.record(
new ConsumerGroupPartitionMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
value,
(short) 0
)
);
}
/**
* Creates a ConsumerGroupPartitionMetadata tombstone.
*

View File

@ -229,7 +229,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord;
@ -490,6 +490,13 @@ public class GroupMetadataManager {
*/
private MetadataImage metadataImage;
/**
* The cache for topic hash value by topic name.
* A topic hash is calculated when there is a group subscribes to it.
* A topic hash is removed when it's updated in MetadataImage or there is no group subscribes to it.
*/
private final Map<String, Long> topicHashCache;
/**
* This tracks the version (or the offset) of the last metadata image
* with newly created topics.
@ -550,6 +557,7 @@ public class GroupMetadataManager {
this.shareGroupAssignor = shareGroupAssignor;
this.authorizerPlugin = authorizerPlugin;
this.streamsGroupAssignors = streamsGroupAssignors.stream().collect(Collectors.toMap(TaskAssignor::name, Function.identity()));
this.topicHashCache = new HashMap<>();
}
/**
@ -1355,8 +1363,8 @@ public class GroupMetadataManager {
snapshotRegistry,
metrics,
classicGroup,
metadataImage.topics(),
metadataImage.cluster()
topicHashCache,
metadataImage
);
} catch (SchemaException e) {
log.warn("Cannot upgrade classic group " + classicGroup.groupId() +
@ -3302,26 +3310,24 @@ public class GroupMetadataManager {
));
}
// Compute the subscription metadata.
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
long groupMetadataHash = ModernGroup.computeMetadataHash(
subscribedTopicNames,
metadataImage.topics(),
metadataImage.cluster()
topicHashCache,
metadataImage
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
if (groupMetadataHash != group.metadataHash()) {
if (log.isDebugEnabled()) {
log.debug("[GroupId {}] Computed new subscription metadata: {}.",
groupId, subscriptionMetadata);
log.debug("[GroupId {}] Computed new metadata hash: {}.",
groupId, groupMetadataHash);
}
bumpGroupEpoch = true;
records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}
if (bumpGroupEpoch) {
int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, groupMetadataHash));
log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(
time.milliseconds() + METADATA_REFRESH_INTERVAL_MS,
@ -3608,10 +3614,11 @@ public class GroupMetadataManager {
member,
updatedMember
);
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
long groupMetadataHash = ModernGroup.computeMetadataHash(
subscribedTopicNamesMap,
metadataImage.topics(),
metadataImage.cluster()
topicHashCache,
metadataImage
);
int numMembers = group.numMembers();
@ -3625,24 +3632,30 @@ public class GroupMetadataManager {
numMembers
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
if (groupMetadataHash != group.metadataHash()) {
if (log.isDebugEnabled()) {
log.debug("[GroupId {}] Computed new subscription metadata: {}.",
groupId, subscriptionMetadata);
log.debug("[GroupId {}] Computed new metadata hash: {}.",
groupId, groupMetadataHash);
}
bumpGroupEpoch = true;
records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}
if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, groupMetadataHash));
log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
// Before 4.0, the coordinator used subscription metadata to keep topic metadata.
// After 4.1, the subscription metadata is replaced by the metadata hash. If there is subscription metadata in log,
// add a tombstone record to remove it.
if (group.hasSubscriptionMetadataRecord()) {
records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
}
return new UpdateSubscriptionMetadataResult(
groupEpoch,
subscriptionType
@ -4015,25 +4028,27 @@ public class GroupMetadataManager {
members
);
// We update the subscription metadata without the leaving members.
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
group.computeSubscribedTopicNamesWithoutDeletedMembers(members, deletedRegexes),
metadataImage.topics(),
metadataImage.cluster()
Map<String, SubscriptionCount> subscribedTopicNamesMap = group.computeSubscribedTopicNamesWithoutDeletedMembers(
members,
deletedRegexes
);
long groupMetadataHash = ModernGroup.computeMetadataHash(
subscribedTopicNamesMap,
topicHashCache,
metadataImage
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
if (groupMetadataHash != group.metadataHash()) {
if (log.isDebugEnabled()) {
log.debug("[GroupId {}] Computed new subscription metadata: {}.",
group.groupId(), subscriptionMetadata);
log.debug("[GroupId {}] Computed new metadata hash: {}.",
group.groupId(), groupMetadataHash);
}
records.add(newConsumerGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata));
}
// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", group.groupId(), groupEpoch);
records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch, groupMetadataHash));
log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", group.groupId(), groupEpoch, groupMetadataHash);
for (ConsumerGroupMember member : members) {
cancelTimers(group.groupId(), member.memberId());
@ -4981,7 +4996,11 @@ public class GroupMetadataManager {
) {
groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> {
groupIds.remove(groupId);
return groupIds.isEmpty() ? null : groupIds;
if (groupIds.isEmpty()) {
topicHashCache.remove(topicName);
return null;
}
return groupIds;
});
}
@ -5036,6 +5055,7 @@ public class GroupMetadataManager {
if (value != null) {
ConsumerGroup consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, true);
consumerGroup.setGroupEpoch(value.epoch());
consumerGroup.setMetadataHash(value.metadataHash());
} else {
ConsumerGroup consumerGroup;
try {
@ -5085,15 +5105,9 @@ public class GroupMetadataManager {
return;
}
if (value != null) {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
value.topics().forEach(topicMetadata -> {
subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata));
});
group.setSubscriptionMetadata(subscriptionMetadata);
} else {
group.setSubscriptionMetadata(Map.of());
}
// If value is not null, add subscription metadata tombstone record in the next consumer group heartbeat,
// because the subscription metadata is replaced by metadata hash in ConsumerGroupMetadataValue.
group.setHasSubscriptionMetadataRecord(value != null);
}
/**
@ -5734,11 +5748,15 @@ public class GroupMetadataManager {
Set<String> allGroupIds = new HashSet<>();
topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
String topicName = topicDelta.name();
// Remove topic hash from the cache to recalculate it.
topicHashCache.remove(topicName);
allGroupIds.addAll(groupsSubscribedToTopic(topicName));
});
topicsDelta.deletedTopicIds().forEach(topicId -> {
TopicImage topicImage = delta.image().topics().getTopic(topicId);
allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
String topicName = topicImage.name();
topicHashCache.remove(topicName);
allGroupIds.addAll(groupsSubscribedToTopic(topicName));
});
allGroupIds.forEach(groupId -> {
Group group = groups.get(groupId);
@ -8375,6 +8393,10 @@ public class GroupMetadataManager {
return Collections.unmodifiableSet(this.groups.keySet());
}
// Visible for testing
Map<String, Long> topicHashCache() {
return Collections.unmodifiableMap(this.topicHashCache);
}
/**
* Get the session timeout of the provided consumer group.

View File

@ -348,7 +348,7 @@ public class Utils {
* @param topicHashes The map of topic hashes. Key is topic name and value is the topic hash.
* @return The hash of the group.
*/
static long computeGroupHash(Map<String, Long> topicHashes) {
public static long computeGroupHash(Map<String, Long> topicHashes) {
if (topicHashes.isEmpty()) {
return 0;
}
@ -386,7 +386,7 @@ public class Utils {
* @param metadataImage The cluster image.
* @return The hash of the topic.
*/
static long computeTopicHash(String topicName, MetadataImage metadataImage) {
public static long computeTopicHash(String topicName, MetadataImage metadataImage) {
TopicImage topicImage = metadataImage.topics().getTopic(topicName);
if (topicImage == null) {
return 0;

View File

@ -20,13 +20,16 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineLong;
import org.apache.kafka.timeline.TimelineObject;
import java.util.Collections;
@ -88,6 +91,11 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
*/
protected final TimelineHashMap<String, TopicMetadata> subscribedTopicMetadata;
/**
* The metadata hash which is computed based on the all subscribed topics.
*/
protected final TimelineLong metadataHash;
/**
* The group's subscription type.
* This value is set to Homogeneous by default.
@ -134,6 +142,7 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
this.members = new TimelineHashMap<>(snapshotRegistry, 0);
this.subscribedTopicNames = new TimelineHashMap<>(snapshotRegistry, 0);
this.subscribedTopicMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.metadataHash = new TimelineLong(snapshotRegistry);
this.subscriptionType = new TimelineObject<>(snapshotRegistry, HOMOGENEOUS);
this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
this.targetAssignment = new TimelineHashMap<>(snapshotRegistry, 0);
@ -355,6 +364,13 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
return Collections.unmodifiableMap(subscribedTopicMetadata);
}
/**
* @return The metadata hash.
*/
public long metadataHash() {
return metadataHash.get();
}
/**
* Updates the subscription metadata. This replaces the previous one.
*
@ -367,6 +383,15 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
this.subscribedTopicMetadata.putAll(subscriptionMetadata);
}
/**
* Updates the metadata hash.
*
* @param metadataHash The new metadata hash.
*/
public void setMetadataHash(long metadataHash) {
this.metadataHash.set(metadataHash);
}
/**
* Computes the subscription metadata based on the current subscription info.
*
@ -398,6 +423,24 @@ public abstract class ModernGroup<T extends ModernGroupMember> implements Group
return Collections.unmodifiableMap(newSubscriptionMetadata);
}
public static long computeMetadataHash(
Map<String, SubscriptionCount> subscribedTopicNames,
Map<String, Long> topicHashCache,
MetadataImage metadataImage
) {
Map<String, Long> topicHash = new HashMap<>(subscribedTopicNames.size());
subscribedTopicNames.keySet().forEach(topicName -> {
TopicImage topicImage = metadataImage.topics().getTopic(topicName);
if (topicImage != null) {
topicHash.put(
topicName,
topicHashCache.computeIfAbsent(topicName, k -> Utils.computeTopicHash(topicName, metadataImage))
);
}
});
return Utils.computeGroupHash(topicHash);
}
/**
* Updates the metadata refresh deadline.
*

View File

@ -43,7 +43,7 @@ import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.ModernGroup;
import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
@ -152,6 +152,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
*/
private final TimelineHashMap<String, ResolvedRegularExpression> resolvedRegularExpressions;
private final TimelineObject<Boolean> hasSubscriptionMetadataRecord;
public ConsumerGroup(
SnapshotRegistry snapshotRegistry,
String groupId,
@ -167,6 +169,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
this.currentPartitionEpoch = new TimelineHashMap<>(snapshotRegistry, 0);
this.subscribedRegularExpressions = new TimelineHashMap<>(snapshotRegistry, 0);
this.resolvedRegularExpressions = new TimelineHashMap<>(snapshotRegistry, 0);
this.hasSubscriptionMetadataRecord = new TimelineObject<>(snapshotRegistry, false);
}
/**
@ -1130,8 +1133,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
* @param snapshotRegistry The SnapshotRegistry.
* @param metrics The GroupCoordinatorMetricsShard.
* @param classicGroup The converted classic group.
* @param topicsImage The current metadata for all available topics.
* @param clusterImage The current metadata for the Kafka cluster.
* @param topicHashCache The cache for topic hashes.
* @param metadataImage The current metadata image for the Kafka cluster.
* @return The created ConsumerGroup.
*
* @throws SchemaException if any member's subscription or assignment cannot be deserialized.
@ -1141,8 +1144,8 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
ClassicGroup classicGroup,
TopicsImage topicsImage,
ClusterImage clusterImage
Map<String, Long> topicHashCache,
MetadataImage metadataImage
) {
String groupId = classicGroup.groupId();
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
@ -1162,7 +1165,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
if (assignment.userData() != null && assignment.userData().hasRemaining()) {
throw new UnsupportedVersionException("userData from a custom assignor would be lost");
}
assignedPartitions = toTopicPartitionMap(assignment, topicsImage);
assignedPartitions = toTopicPartitionMap(assignment, metadataImage.topics());
}
// Every member is guaranteed to have metadata set when it joins,
@ -1198,10 +1201,10 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
consumerGroup.updateMember(newMember);
});
consumerGroup.setSubscriptionMetadata(consumerGroup.computeSubscriptionMetadata(
consumerGroup.setMetadataHash(ModernGroup.computeMetadataHash(
consumerGroup.subscribedTopicNames(),
topicsImage,
clusterImage
topicHashCache,
metadataImage
));
return consumerGroup;
@ -1219,9 +1222,7 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember))
);
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId(), subscriptionMetadata()));
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch(), 0));
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch(), metadataHash()));
members().forEach((consumerGroupMemberId, consumerGroupMember) ->
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(
@ -1309,4 +1310,12 @@ public class ConsumerGroup extends ModernGroup<ConsumerGroupMember> {
}
return false;
}
public void setHasSubscriptionMetadataRecord(boolean hasSubscriptionMetadataRecord) {
this.hasSubscriptionMetadataRecord.set(hasSubscriptionMetadataRecord);
}
public boolean hasSubscriptionMetadataRecord() {
return hasSubscriptionMetadataRecord.get();
}
}

View File

@ -33,7 +33,6 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
@ -49,7 +48,6 @@ import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
@ -62,7 +60,6 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@ -72,7 +69,6 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Stream;
import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedTopicAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@ -82,7 +78,6 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.n
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupEpochTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord;
@ -155,47 +150,6 @@ public class GroupCoordinatorRecordHelpersTest {
));
}
@Test
public void testNewConsumerGroupSubscriptionMetadataRecord() {
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
Map<String, TopicMetadata> subscriptionMetadata = new LinkedHashMap<>();
subscriptionMetadata.put("foo", new TopicMetadata(
fooTopicId,
"foo",
10
));
subscriptionMetadata.put("bar", new TopicMetadata(
barTopicId,
"bar",
20
));
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new ConsumerGroupPartitionMetadataKey()
.setGroupId("group-id"),
new ApiMessageAndVersion(
new ConsumerGroupPartitionMetadataValue()
.setTopics(Arrays.asList(
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(fooTopicId)
.setTopicName("foo")
.setNumPartitions(10),
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(barTopicId)
.setTopicName("bar")
.setNumPartitions(20))),
(short) 0
)
);
assertRecordEquals(expectedRecord, newConsumerGroupSubscriptionMetadataRecord(
"group-id",
subscriptionMetadata
));
}
@Test
public void testNewConsumerGroupSubscriptionMetadataTombstoneRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.tombstone(
@ -208,47 +162,6 @@ public class GroupCoordinatorRecordHelpersTest {
));
}
@Test
public void testEmptyPartitionMetadataWhenRacksUnavailableGroupSubscriptionMetadataRecord() {
Uuid fooTopicId = Uuid.randomUuid();
Uuid barTopicId = Uuid.randomUuid();
Map<String, TopicMetadata> subscriptionMetadata = new LinkedHashMap<>();
subscriptionMetadata.put("foo", new TopicMetadata(
fooTopicId,
"foo",
10
));
subscriptionMetadata.put("bar", new TopicMetadata(
barTopicId,
"bar",
20
));
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new ConsumerGroupPartitionMetadataKey()
.setGroupId("group-id"),
new ApiMessageAndVersion(
new ConsumerGroupPartitionMetadataValue()
.setTopics(Arrays.asList(
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(fooTopicId)
.setTopicName("foo")
.setNumPartitions(10),
new ConsumerGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(barTopicId)
.setTopicName("bar")
.setNumPartitions(20))),
(short) 0
)
);
assertRecordEquals(expectedRecord, newConsumerGroupSubscriptionMetadataRecord(
"group-id",
subscriptionMetadata
));
}
@Test
public void testNewConsumerGroupEpochRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(

View File

@ -556,7 +556,7 @@ public class GroupMetadataManagerTestContext {
groupConfigManager
);
consumerGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay));
consumerGroupBuilders.forEach(builder -> builder.build().forEach(context::replay));
shareGroupBuilders.forEach(builder -> builder.build(metadataImage.topics()).forEach(context::replay));
streamsGroupBuilders.forEach(builder -> builder.build().forEach(context::replay));

View File

@ -20,9 +20,6 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
import java.util.HashMap;
@ -36,7 +33,7 @@ public class ConsumerGroupBuilder {
private int assignmentEpoch;
private final Map<String, ConsumerGroupMember> members = new HashMap<>();
private final Map<String, Assignment> assignments = new HashMap<>();
private Map<String, TopicMetadata> subscriptionMetadata;
private long metadataHash = 0L;
private final Map<String, ResolvedRegularExpression> resolvedRegularExpressions = new HashMap<>();
public ConsumerGroupBuilder(String groupId, int groupEpoch) {
@ -58,8 +55,8 @@ public class ConsumerGroupBuilder {
return this;
}
public ConsumerGroupBuilder withSubscriptionMetadata(Map<String, TopicMetadata> subscriptionMetadata) {
this.subscriptionMetadata = subscriptionMetadata;
public ConsumerGroupBuilder withMetadataHash(long metadataHash) {
this.metadataHash = metadataHash;
return this;
}
@ -73,7 +70,7 @@ public class ConsumerGroupBuilder {
return this;
}
public List<CoordinatorRecord> build(TopicsImage topicsImage) {
public List<CoordinatorRecord> build() {
List<CoordinatorRecord> records = new ArrayList<>();
// Add subscription records for members.
@ -86,29 +83,8 @@ public class ConsumerGroupBuilder {
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(groupId, regex, resolvedRegularExpression))
);
// Add subscription metadata.
if (subscriptionMetadata == null) {
subscriptionMetadata = new HashMap<>();
members.forEach((memberId, member) ->
member.subscribedTopicNames().forEach(topicName -> {
TopicImage topicImage = topicsImage.getTopic(topicName);
if (topicImage != null) {
subscriptionMetadata.put(topicName, new TopicMetadata(
topicImage.id(),
topicImage.name(),
topicImage.partitions().size()
));
}
})
);
}
if (!subscriptionMetadata.isEmpty()) {
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}
// Add group epoch record.
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, groupEpoch, metadataHash));
// Add target assignment records.
assignments.forEach((memberId, assignment) ->

View File

@ -46,6 +46,7 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataV
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.ModernGroup;
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.image.MetadataImage;
@ -57,6 +58,7 @@ import org.junit.jupiter.params.provider.CsvSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -70,6 +72,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedRecordsEquals;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.apache.kafka.coordinator.group.Utils.computeGroupHash;
import static org.apache.kafka.coordinator.group.Utils.computeTopicHash;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
@ -1532,8 +1536,8 @@ public class ConsumerGroupTest {
new SnapshotRegistry(logContext),
mock(GroupCoordinatorMetricsShard.class),
classicGroup,
metadataImage.topics(),
metadataImage.cluster()
new HashMap<>(),
metadataImage
);
ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
@ -1546,10 +1550,10 @@ public class ConsumerGroupTest {
expectedConsumerGroup.updateTargetAssignment(memberId, new Assignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
expectedConsumerGroup.setSubscriptionMetadata(Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
));
expectedConsumerGroup.setMetadataHash(computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage),
barTopicName, computeTopicHash(barTopicName, metadataImage)
)));
expectedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(memberId)
.setMemberEpoch(classicGroup.generationId())
.setState(MemberState.STABLE)
@ -2262,4 +2266,105 @@ public class ConsumerGroupTest {
)
);
}
@Test
public void testComputeMetadataHash() {
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "foo", 1)
.addTopic(Uuid.randomUuid(), "bar", 1)
.addRacks()
.build();
Map<String, Long> cache = new HashMap<>();
assertEquals(
computeGroupHash(Map.of(
"foo", computeTopicHash("foo", metadataImage),
"bar", computeTopicHash("bar", metadataImage)
)),
ModernGroup.computeMetadataHash(
Map.of(
"foo", new SubscriptionCount(1, 0),
"bar", new SubscriptionCount(1, 0)
),
cache,
metadataImage
)
);
assertEquals(
Map.of(
"foo", computeTopicHash("foo", metadataImage),
"bar", computeTopicHash("bar", metadataImage)
),
cache
);
}
@Test
public void testComputeMetadataHashUseCacheData() {
// Use hash map because topic hash cache cannot be immutable.
Map<String, Long> cache = new HashMap<>();
cache.put("foo", 1234L);
cache.put("bar", 4321L);
assertEquals(
computeGroupHash(cache),
ModernGroup.computeMetadataHash(
Map.of(
"foo", new SubscriptionCount(1, 0),
"bar", new SubscriptionCount(1, 0)
),
cache,
new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "foo", 1)
.addTopic(Uuid.randomUuid(), "bar", 1)
.addRacks()
.build()
)
);
assertEquals(
Map.of(
"foo", 1234L,
"bar", 4321L
),
cache
);
}
@Test
public void testComputeMetadataHashIgnoreTopicHashIfItIsNotInMetadataImage() {
// Use hash map because topic hash cache cannot be immutable.
// The zar is not in metadata image, so it should not be used.
Map<String, Long> cache = new HashMap<>();
cache.put("foo", 1234L);
cache.put("bar", 4321L);
cache.put("zar", 0L);
assertEquals(
computeGroupHash(Map.of(
"foo", 1234L,
"bar", 4321L
)),
ModernGroup.computeMetadataHash(
Map.of(
"foo", new SubscriptionCount(1, 0),
"bar", new SubscriptionCount(1, 0)
),
cache,
new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "foo", 1)
.addTopic(Uuid.randomUuid(), "bar", 1)
.addRacks()
.build()
)
);
// Although the zar is not in metadata image, it should not be removed from computeMetadataHash function.
assertEquals(
Map.of(
"foo", 1234L,
"bar", 4321L,
"zar", 0L
),
cache
);
}
}