From 98fbd8afc7f3ba806d742690536090936738f1e7 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 5 Jul 2023 18:28:38 +0200 Subject: [PATCH] KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image (#13901) This patch adds (1) the logic to propagate a new MetadataImage to the running coordinators; and (2) the logic to ensure that all the consumer groups subscribed to topics with changes will refresh their subscriptions metadata on the next heartbeat. In the mean time, it ensures that freshly loaded consumer groups also refresh their subscriptions metadata on the next heartbeat. Reviewers: Jeff Kim , Justine Olshan --- checkstyle/suppressions.xml | 2 +- .../group/GroupCoordinatorService.java | 1 + .../group/GroupMetadataManager.java | 223 +++++++- .../group/ReplicatedGroupCoordinator.java | 24 + .../group/consumer/ConsumerGroup.java | 78 ++- .../group/runtime/Coordinator.java | 16 +- .../group/runtime/CoordinatorRuntime.java | 40 +- .../group/GroupMetadataManagerTest.java | 524 ++++++++++++++++-- .../group/consumer/ConsumerGroupTest.java | 74 ++- .../group/runtime/CoordinatorRuntimeTest.java | 61 +- 10 files changed, 965 insertions(+), 78 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index d7e36890cdd..6e2a8c0ca4e 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -323,7 +323,7 @@ + files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/> assignors = null; - private TopicsImage topicsImage = null; private int consumerGroupMaxSize = Integer.MAX_VALUE; private int consumerGroupHeartbeatIntervalMs = 5000; + private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE; + private MetadataImage metadataImage = null; Builder withLogContext(LogContext logContext) { this.logContext = logContext; @@ -103,6 +110,11 @@ public class GroupMetadataManager { return this; } + Builder withTime(Time time) { + this.time = time; + return this; + } + Builder withAssignors(List assignors) { this.assignors = assignors; return this; @@ -118,15 +130,21 @@ public class GroupMetadataManager { return this; } - Builder withTopicsImage(TopicsImage topicsImage) { - this.topicsImage = topicsImage; + Builder withConsumerGroupMetadataRefreshIntervalMs(int consumerGroupMetadataRefreshIntervalMs) { + this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs; + return this; + } + + Builder withMetadataImage(MetadataImage metadataImage) { + this.metadataImage = metadataImage; return this; } GroupMetadataManager build() { if (logContext == null) logContext = new LogContext(); if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext); - if (topicsImage == null) topicsImage = TopicsImage.EMPTY; + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; + if (time == null) time = Time.SYSTEM; if (assignors == null || assignors.isEmpty()) { throw new IllegalStateException("Assignors must be set before building."); @@ -135,10 +153,12 @@ public class GroupMetadataManager { return new GroupMetadataManager( snapshotRegistry, logContext, + time, assignors, - topicsImage, + metadataImage, consumerGroupMaxSize, - consumerGroupHeartbeatIntervalMs + consumerGroupHeartbeatIntervalMs, + consumerGroupMetadataRefreshIntervalMs ); } } @@ -153,6 +173,11 @@ public class GroupMetadataManager { */ private final SnapshotRegistry snapshotRegistry; + /** + * The system time. + */ + private final Time time; + /** * The supported partition assignors keyed by their name. */ @@ -168,6 +193,11 @@ public class GroupMetadataManager { */ private final TimelineHashMap groups; + /** + * The group ids keyed by topic names. + */ + private final TimelineHashMap> groupsByTopics; + /** * The maximum number of members allowed in a single consumer group. */ @@ -179,26 +209,43 @@ public class GroupMetadataManager { private final int consumerGroupHeartbeatIntervalMs; /** - * The topics metadata (or image). + * The metadata refresh interval. */ - private TopicsImage topicsImage; + private final int consumerGroupMetadataRefreshIntervalMs; + + /** + * The metadata image. + */ + private MetadataImage metadataImage; private GroupMetadataManager( SnapshotRegistry snapshotRegistry, LogContext logContext, + Time time, List assignors, - TopicsImage topicsImage, + MetadataImage metadataImage, int consumerGroupMaxSize, - int consumerGroupHeartbeatIntervalMs + int consumerGroupHeartbeatIntervalMs, + int consumerGroupMetadataRefreshIntervalMs ) { this.log = logContext.logger(GroupMetadataManager.class); this.snapshotRegistry = snapshotRegistry; - this.topicsImage = topicsImage; + this.time = time; + this.metadataImage = metadataImage; this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity())); this.defaultAssignor = assignors.get(0); this.groups = new TimelineHashMap<>(snapshotRegistry, 0); + this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0); this.consumerGroupMaxSize = consumerGroupMaxSize; this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs; + this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs; + } + + /** + * @return The current metadata image used by the group metadata manager. + */ + public MetadataImage image() { + return metadataImage; } /** @@ -472,7 +519,8 @@ public class GroupMetadataManager { String assignorName, List ownedTopicPartitions ) throws ApiException { - List records = new ArrayList<>(); + final long currentTimeMs = time.milliseconds(); + final List records = new ArrayList<>(); // Get or create the consumer group. boolean createIfNotExists = memberEpoch == 0; @@ -506,30 +554,47 @@ public class GroupMetadataManager { .setClientHost(clientHost) .build(); + boolean bumpGroupEpoch = false; if (!updatedMember.equals(member)) { records.add(newMemberSubscriptionRecord(groupId, updatedMember)); if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) { log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " + updatedMember.subscribedTopicNames()); + bumpGroupEpoch = true; + } - subscriptionMetadata = group.computeSubscriptionMetadata( - member, - updatedMember, - topicsImage - ); + if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) { + log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed regex to: " + + updatedMember.subscribedTopicRegex()); + bumpGroupEpoch = true; + } + } - if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { - log.info("[GroupId " + groupId + "] Computed new subscription metadata: " - + subscriptionMetadata + "."); - records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); - } + if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) { + // The subscription metadata is updated in two cases: + // 1) The member has updated its subscriptions; + // 2) The refresh deadline has been reached. + subscriptionMetadata = group.computeSubscriptionMetadata( + member, + updatedMember, + metadataImage.topics() + ); + if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { + log.info("[GroupId " + groupId + "] Computed new subscription metadata: " + + subscriptionMetadata + "."); + bumpGroupEpoch = true; + records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); + } + + if (bumpGroupEpoch) { groupEpoch += 1; records.add(newGroupEpochRecord(groupId, groupEpoch)); - log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + "."); } + + group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch); } // 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The @@ -635,7 +700,7 @@ public class GroupMetadataManager { Map subscriptionMetadata = group.computeSubscriptionMetadata( member, null, - topicsImage + metadataImage.topics() ); if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { @@ -709,14 +774,15 @@ public class GroupMetadataManager { String groupId = key.groupId(); String memberId = key.memberId(); + ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, value != null); + Set oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames()); + if (value != null) { - ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true); ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true); consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember) .updateWith(value) .build()); } else { - ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false); ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false); if (oldMember.memberEpoch() != -1) { throw new IllegalStateException("Received a tombstone record to delete member " + memberId @@ -728,6 +794,81 @@ public class GroupMetadataManager { } consumerGroup.removeMember(memberId); } + + updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames()); + } + + /** + * @return The set of groups subscribed to the topic. + */ + public Set groupsSubscribedToTopic(String topicName) { + Set groups = groupsByTopics.get(topicName); + return groups != null ? groups : Collections.emptySet(); + } + + /** + * Subscribes a group to a topic. + * + * @param groupId The group id. + * @param topicName The topic name. + */ + private void subscribeGroupToTopic( + String groupId, + String topicName + ) { + groupsByTopics + .computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1)) + .add(groupId); + } + + /** + * Unsubscribes a group from a topic. + * + * @param groupId The group id. + * @param topicName The topic name. + */ + private void unsubscribeGroupFromTopic( + String groupId, + String topicName + ) { + groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> { + groupIds.remove(groupId); + return groupIds.isEmpty() ? null : groupIds; + }); + } + + /** + * Updates the group by topics mapping. + * + * @param groupId The group id. + * @param oldSubscribedTopics The old group subscriptions. + * @param newSubscribedTopics The new group subscriptions. + */ + private void updateGroupsByTopics( + String groupId, + Set oldSubscribedTopics, + Set newSubscribedTopics + ) { + if (oldSubscribedTopics.isEmpty()) { + newSubscribedTopics.forEach(topicName -> + subscribeGroupToTopic(groupId, topicName) + ); + } else if (newSubscribedTopics.isEmpty()) { + oldSubscribedTopics.forEach(topicName -> + unsubscribeGroupFromTopic(groupId, topicName) + ); + } else { + oldSubscribedTopics.forEach(topicName -> { + if (!newSubscribedTopics.contains(topicName)) { + unsubscribeGroupFromTopic(groupId, topicName); + } + }); + newSubscribedTopics.forEach(topicName -> { + if (!oldSubscribedTopics.contains(topicName)) { + subscribeGroupToTopic(groupId, topicName); + } + }); + } } /** @@ -874,4 +1015,32 @@ public class GroupMetadataManager { consumerGroup.updateMember(newMember); } } + + /** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ + public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { + metadataImage = newImage; + + // Notify all the groups subscribed to the created, updated or + // deleted topics. + Set allGroupIds = new HashSet<>(); + delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> { + String topicName = topicDelta.name(); + allGroupIds.addAll(groupsSubscribedToTopic(topicName)); + }); + delta.topicsDelta().deletedTopicIds().forEach(topicId -> { + TopicImage topicImage = delta.image().topics().getTopic(topicId); + allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name())); + }); + allGroupIds.forEach(groupId -> { + Group group = groups.get(groupId); + if (group != null && group.type() == Group.GroupType.CONSUMER) { + ((ConsumerGroup) group).requestMetadataRefresh(); + } + }); + } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java index 89d08d450a2..201da405e6d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java @@ -36,6 +36,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen import org.apache.kafka.coordinator.group.runtime.Coordinator; import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; @@ -130,6 +132,28 @@ public class ReplicatedGroupCoordinator implements Coordinator { return groupMetadataManager.consumerGroupHeartbeat(context, request); } + /** + * The coordinator has been loaded. This is used to apply any + * post loading operations (e.g. registering timers). + * + * @param newImage The metadata image. + */ + @Override + public void onLoaded(MetadataImage newImage) { + groupMetadataManager.onNewMetadataImage(newImage, new MetadataDelta(newImage)); + } + + /** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ + @Override + public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) { + groupMetadataManager.onNewMetadataImage(newImage, delta); + } + /** * @return The ApiMessage or null. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java index 9df00733b30..6f682d4e82f 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java @@ -58,6 +58,18 @@ public class ConsumerGroup implements Group { } } + public static class DeadlineAndEpoch { + static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0); + + public final long deadlineMs; + public final int epoch; + + DeadlineAndEpoch(long deadlineMs, int epoch) { + this.deadlineMs = deadlineMs; + this.epoch = epoch; + } + } + /** * The snapshot registry. */ @@ -119,6 +131,18 @@ public class ConsumerGroup implements Group { */ private final TimelineHashMap> currentPartitionEpoch; + /** + * The metadata refresh deadline. It consists of a timestamp in milliseconds together with + * the group epoch at the time of setting it. The metadata refresh time is considered as a + * soft state (read that it is not stored in a timeline data structure). It is like this + * because it is not persisted to the log. The group epoch is here to ensure that the + * metadata refresh deadline is invalidated if the group epoch does not correspond to + * the current group epoch. This can happen if the metadata refresh deadline is updated + * after having refreshed the metadata but the write operation failed. In this case, the + * time is not automatically rolled back. + */ + private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY; + public ConsumerGroup( SnapshotRegistry snapshotRegistry, String groupId @@ -249,8 +273,10 @@ public class ConsumerGroup implements Group { * @param memberId The member id to remove. */ public void removeMember(String memberId) { - ConsumerGroupMember member = members.remove(memberId); - maybeRemovePartitionEpoch(member); + ConsumerGroupMember oldMember = members.remove(memberId); + maybeUpdateSubscribedTopicNames(oldMember, null); + maybeUpdateServerAssignors(oldMember, null); + maybeRemovePartitionEpoch(oldMember); maybeUpdateGroupState(); } @@ -279,6 +305,13 @@ public class ConsumerGroup implements Group { return Collections.unmodifiableMap(members); } + /** + * @return An immutable Set containing all the subscribed topic names. + */ + public Set subscribedTopicNames() { + return Collections.unmodifiableSet(subscribedTopicNames.keySet()); + } + /** * Returns the target assignment of the member. * @@ -423,6 +456,47 @@ public class ConsumerGroup implements Group { return Collections.unmodifiableMap(newSubscriptionMetadata); } + /** + * Updates the metadata refresh deadline. + * + * @param deadlineMs The deadline in milliseconds. + * @param groupEpoch The associated group epoch. + */ + public void setMetadataRefreshDeadline( + long deadlineMs, + int groupEpoch + ) { + this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, groupEpoch); + } + + /** + * Requests a metadata refresh. + */ + public void requestMetadataRefresh() { + this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY; + } + + /** + * Checks if a metadata refresh is required. A refresh is required in two cases: + * 1) The deadline is smaller or equal to the current time; + * 2) The group epoch associated with the deadline is larger than + * the current group epoch. This means that the operations which updated + * the deadline failed. + * + * @param currentTimeMs The current time in milliseconds. + * @return A boolean indicating whether a refresh is required or not. + */ + public boolean hasMetadataExpired(long currentTimeMs) { + return currentTimeMs >= metadataRefreshDeadline.deadlineMs || groupEpoch() < metadataRefreshDeadline.epoch; + } + + /** + * @return The metadata refresh deadline. + */ + public DeadlineAndEpoch metadataRefreshDeadline() { + return metadataRefreshDeadline; + } + /** * Updates the current state of the group. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java index 5ab88a0efa6..8189e1ab89b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.coordinator.group.runtime; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; + /** * Coordinator is basically a replicated state machine managed by the * {@link CoordinatorRuntime}. @@ -25,8 +28,19 @@ public interface Coordinator extends CoordinatorPlayback { /** * The coordinator has been loaded. This is used to apply any * post loading operations (e.g. registering timers). + * + * @param newImage The metadata image. */ - default void onLoaded() {} + default void onLoaded(MetadataImage newImage) {} + + /** + * A new metadata image is available. This is only called after {@link Coordinator#onLoaded(MetadataImage)} + * is called to signal that the coordinator has been fully loaded. + * + * @param newImage The new metadata image. + * @param delta The delta image. + */ + default void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {} /** * The coordinator has been unloaded. This is used to apply diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index a18964cc3bb..00c8937f436 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -24,6 +24,8 @@ import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.deferred.DeferredEvent; import org.apache.kafka.deferred.DeferredEventQueue; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.slf4j.Logger; @@ -349,7 +351,7 @@ public class CoordinatorRuntime, U> implements AutoClos state = CoordinatorState.ACTIVE; snapshotRegistry.getOrCreateSnapshot(0); partitionWriter.registerListener(tp, highWatermarklistener); - coordinator.onLoaded(); + coordinator.onLoaded(metadataImage); break; case FAILED: @@ -807,6 +809,11 @@ public class CoordinatorRuntime, U> implements AutoClos */ private final AtomicBoolean isRunning = new AtomicBoolean(true); + /** + * The latest known metadata image. + */ + private volatile MetadataImage metadataImage = MetadataImage.EMPTY; + /** * Constructor. * @@ -1083,6 +1090,37 @@ public class CoordinatorRuntime, U> implements AutoClos }); } + /** + * A new metadata image is available. + * + * @param newImage The new metadata image. + * @param delta The metadata delta. + */ + public void onNewMetadataImage( + MetadataImage newImage, + MetadataDelta delta + ) { + throwIfNotRunning(); + log.debug("Scheduling applying of a new metadata image with offset {}.", newImage.offset()); + + // Update global image. + metadataImage = newImage; + + // Push an event for each coordinator. + coordinators.keySet().forEach(tp -> { + scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" + newImage.offset() + ")", tp, () -> { + CoordinatorContext context = contextOrThrow(tp); + if (context.state == CoordinatorState.ACTIVE) { + log.debug("Applying new metadata image with offset {} to {}.", newImage.offset(), tp); + context.coordinator.onNewMetadataImage(newImage, delta); + } else { + log.debug("Ignoring new metadata image with offset {} for {} because the coordinator is not active.", + newImage.offset(), tp); + } + }); + }); + } + /** * Closes the runtime. This closes all the coordinators currently registered * in the runtime. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 2296d58ecd7..41c2b1cb8be 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.UnsupportedAssignorException; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.network.ClientInformation; import org.apache.kafka.common.network.ListenerName; @@ -37,6 +38,8 @@ import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; @@ -59,8 +62,10 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey; import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue; import org.apache.kafka.coordinator.group.runtime.CoordinatorResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.TopicImage; -import org.apache.kafka.image.TopicsDelta; import org.apache.kafka.image.TopicsImage; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; @@ -78,13 +83,16 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -113,10 +121,10 @@ public class GroupMetadataManagerTest { } } - public static class TopicsImageBuilder { - private TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY); + public static class MetadataImageBuilder { + private MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); - public TopicsImageBuilder addTopic( + public MetadataImageBuilder addTopic( Uuid topicId, String topicName, int numPartitions @@ -130,8 +138,8 @@ public class GroupMetadataManagerTest { return this; } - public TopicsImage build() { - return delta.apply(); + public MetadataImage build() { + return delta.apply(MetadataProvenance.EMPTY); } } @@ -141,6 +149,7 @@ public class GroupMetadataManagerTest { private int assignmentEpoch; private final Map members = new HashMap<>(); private final Map assignments = new HashMap<>(); + private Map subscriptionMetadata; public ConsumerGroupBuilder(String groupId, int groupEpoch) { this.groupId = groupId; @@ -153,6 +162,11 @@ public class GroupMetadataManagerTest { return this; } + public ConsumerGroupBuilder withSubscriptionMetadata(Map subscriptionMetadata) { + this.subscriptionMetadata = subscriptionMetadata; + return this; + } + public ConsumerGroupBuilder withAssignment(String memberId, Map> assignment) { this.assignments.put(memberId, new Assignment(assignment)); return this; @@ -172,19 +186,21 @@ public class GroupMetadataManagerTest { }); // Add subscription metadata. - Map subscriptionMetadata = new HashMap<>(); - members.forEach((memberId, member) -> { - member.subscribedTopicNames().forEach(topicName -> { - TopicImage topicImage = topicsImage.getTopic(topicName); - if (topicImage != null) { - subscriptionMetadata.put(topicName, new TopicMetadata( - topicImage.id(), - topicImage.name(), - topicImage.partitions().size() - )); - } + if (subscriptionMetadata == null) { + subscriptionMetadata = new HashMap<>(); + members.forEach((memberId, member) -> { + member.subscribedTopicNames().forEach(topicName -> { + TopicImage topicImage = topicsImage.getTopic(topicName); + if (topicImage != null) { + subscriptionMetadata.put(topicName, new TopicMetadata( + topicImage.id(), + topicImage.name(), + topicImage.partitions().size() + )); + } + }); }); - }); + } if (!subscriptionMetadata.isEmpty()) { records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); @@ -212,15 +228,17 @@ public class GroupMetadataManagerTest { static class GroupMetadataManagerTestContext { static class Builder { + final private Time time = new MockTime(); final private LogContext logContext = new LogContext(); final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); - private TopicsImage topicsImage; + private MetadataImage metadataImage; private List assignors; private List consumerGroupBuilders = new ArrayList<>(); private int consumerGroupMaxSize = Integer.MAX_VALUE; + private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE; - public Builder withTopicsImage(TopicsImage topicsImage) { - this.topicsImage = topicsImage; + public Builder withMetadataImage(MetadataImage metadataImage) { + this.metadataImage = metadataImage; return this; } @@ -239,24 +257,32 @@ public class GroupMetadataManagerTest { return this; } + public Builder withConsumerGroupMetadataRefreshIntervalMs(int consumerGroupMetadataRefreshIntervalMs) { + this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs; + return this; + } + public GroupMetadataManagerTestContext build() { - if (topicsImage == null) topicsImage = TopicsImage.EMPTY; + if (metadataImage == null) metadataImage = MetadataImage.EMPTY; if (assignors == null) assignors = Collections.emptyList(); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext( + time, snapshotRegistry, new GroupMetadataManager.Builder() .withSnapshotRegistry(snapshotRegistry) .withLogContext(logContext) - .withTopicsImage(topicsImage) + .withTime(time) + .withMetadataImage(metadataImage) .withConsumerGroupHeartbeatInterval(5000) .withConsumerGroupMaxSize(consumerGroupMaxSize) .withAssignors(assignors) + .withConsumerGroupMetadataRefreshIntervalMs(consumerGroupMetadataRefreshIntervalMs) .build() ); consumerGroupBuilders.forEach(builder -> { - builder.build(topicsImage).forEach(context::replay); + builder.build(metadataImage.topics()).forEach(context::replay); }); context.commit(); @@ -265,6 +291,7 @@ public class GroupMetadataManagerTest { } } + final Time time; final SnapshotRegistry snapshotRegistry; final GroupMetadataManager groupMetadataManager; @@ -272,9 +299,11 @@ public class GroupMetadataManagerTest { long lastWrittenOffset = 0L; public GroupMetadataManagerTestContext( + Time time, SnapshotRegistry snapshotRegistry, GroupMetadataManager groupMetadataManager ) { + this.time = time; this.snapshotRegistry = snapshotRegistry; this.groupMetadataManager = groupMetadataManager; } @@ -486,7 +515,7 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) - .withTopicsImage(TopicsImage.EMPTY) + .withMetadataImage(MetadataImage.EMPTY) .build(); assignor.prepareGroupAssignment(new GroupAssignment( @@ -675,7 +704,7 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) - .withTopicsImage(new TopicsImageBuilder() + .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .build()) @@ -764,7 +793,7 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) - .withTopicsImage(new TopicsImageBuilder() + .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .build()) @@ -865,7 +894,7 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) - .withTopicsImage(new TopicsImageBuilder() + .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .build()) @@ -1009,7 +1038,7 @@ public class GroupMetadataManagerTest { // Consumer group with two members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) - .withTopicsImage(new TopicsImageBuilder() + .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .addTopic(zarTopicId, zarTopicName, 1) @@ -1100,7 +1129,7 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) - .withTopicsImage(new TopicsImageBuilder() + .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .build()) @@ -1546,7 +1575,7 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) - .withTopicsImage(new TopicsImageBuilder() + .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .build()) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) @@ -1794,7 +1823,7 @@ public class GroupMetadataManagerTest { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) - .withTopicsImage(new TopicsImageBuilder() + .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .build()) @@ -1912,7 +1941,7 @@ public class GroupMetadataManagerTest { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withAssignors(Collections.singletonList(assignor)) - .withTopicsImage(new TopicsImageBuilder() + .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .build()) @@ -1932,6 +1961,435 @@ public class GroupMetadataManagerTest { .setTopicPartitions(Collections.emptyList()))); } + @Test + public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + // Create a context with one consumer group containing one member. + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap() { + { + // foo only has 3 partitions stored in the metadata but foo has + // 6 partitions the metadata image. + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); + } + })) + .build(); + + // The metadata refresh flag should be true. + ConsumerGroup consumerGroup = context.groupMetadataManager + .getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + + // Prepare the assignment result. + assignor.prepareGroupAssignment(new GroupAssignment( + Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + // Heartbeat. + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + // The member gets partitions 3, 4 and 5 assigned. + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setAssignedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)) + ))), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setTargetMemberEpoch(11) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List expectedRecords = Arrays.asList( + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + + // Check next refresh time. + assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch); + } + + @Test + public void testSubscriptionMetadataRefreshedAgainAfterWriteFailure() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + // Create a context with one consumer group containing one member. + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap() { + { + // foo only has 3 partitions stored in the metadata but foo has + // 6 partitions the metadata image. + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); + } + })) + .build(); + + // The metadata refresh flag should be true. + ConsumerGroup consumerGroup = context.groupMetadataManager + .getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + + // Prepare the assignment result. + assignor.prepareGroupAssignment(new GroupAssignment( + Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + // Heartbeat. + context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + // The metadata refresh flag is set to a future time. + assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch); + + // Rollback the uncommitted changes. This does not rollback the metadata flag + // because it is not using a timeline data structure. + context.rollback(); + + // However, the next heartbeat should detect the divergence based on the epoch and trigger + // a metadata refresh. + CoordinatorResult result = context.consumerGroupHeartbeat( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(10)); + + + // The member gets partitions 3, 4 and 5 assigned. + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(11) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setAssignedTopicPartitions(Arrays.asList( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5)) + ))), + result.response() + ); + + ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(11) + .setPreviousMemberEpoch(10) + .setTargetMemberEpoch(11) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5))) + .build(); + + List expectedRecords = Arrays.asList( + RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() { + { + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6)); + } + }), + RecordHelpers.newGroupEpochRecord(groupId, 11), + RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + )), + RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11), + RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember) + ); + + assertRecordsEquals(expectedRecords, result.records()); + + // Check next refresh time. + assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch); + } + + @Test + public void testGroupIdsByTopics() { + String groupId1 = "group1"; + String groupId2 = "group2"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M1 in group 1 subscribes to foo and bar. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1, + new ConsumerGroupMember.Builder("group1-m1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .build())); + + assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M1 in group 2 subscribes to foo, bar and zar. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2, + new ConsumerGroupMember.Builder("group2-m1") + .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) + .build())); + + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M2 in group 1 subscribes to bar and zar. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1, + new ConsumerGroupMember.Builder("group1-m2") + .setSubscribedTopicNames(Arrays.asList("bar", "zar")) + .build())); + + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M2 in group 2 subscribes to foo and bar. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2, + new ConsumerGroupMember.Builder("group2-m2") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .build())); + + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M1 in group 1 is removed. + context.replay(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId1, "group1-m1")); + context.replay(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId1, "group1-m1")); + + assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M1 in group 2 subscribes to nothing. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2, + new ConsumerGroupMember.Builder("group2-m1") + .setSubscribedTopicNames(Collections.emptyList()) + .build())); + + assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M2 in group 2 subscribes to foo. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2, + new ConsumerGroupMember.Builder("group2-m2") + .setSubscribedTopicNames(Arrays.asList("foo")) + .build())); + + assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M2 in group 2 subscribes to nothing. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2, + new ConsumerGroupMember.Builder("group2-m2") + .setSubscribedTopicNames(Collections.emptyList()) + .build())); + + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + + // M2 in group 1 subscribes to nothing. + context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1, + new ConsumerGroupMember.Builder("group1-m2") + .setSubscribedTopicNames(Collections.emptyList()) + .build())); + + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo")); + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("bar")); + assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar")); + } + + @Test + public void testOnNewMetadataImage() { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .build(); + + // M1 in group 1 subscribes to a and b. + context.replay(RecordHelpers.newMemberSubscriptionRecord("group1", + new ConsumerGroupMember.Builder("group1-m1") + .setSubscribedTopicNames(Arrays.asList("a", "b")) + .build())); + + // M1 in group 2 subscribes to b and c. + context.replay(RecordHelpers.newMemberSubscriptionRecord("group2", + new ConsumerGroupMember.Builder("group2-m1") + .setSubscribedTopicNames(Arrays.asList("b", "c")) + .build())); + + // M1 in group 3 subscribes to d. + context.replay(RecordHelpers.newMemberSubscriptionRecord("group3", + new ConsumerGroupMember.Builder("group3-m1") + .setSubscribedTopicNames(Arrays.asList("d")) + .build())); + + // M1 in group 4 subscribes to e. + context.replay(RecordHelpers.newMemberSubscriptionRecord("group4", + new ConsumerGroupMember.Builder("group4-m1") + .setSubscribedTopicNames(Arrays.asList("e")) + .build())); + + // M1 in group 5 subscribes to f. + context.replay(RecordHelpers.newMemberSubscriptionRecord("group5", + new ConsumerGroupMember.Builder("group5-m1") + .setSubscribedTopicNames(Arrays.asList("f")) + .build())); + + // Ensures that all refresh flags are set to the future. + Arrays.asList("group1", "group2", "group3", "group4", "group5").forEach(groupId -> { + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); + group.setMetadataRefreshDeadline(context.time.milliseconds() + 5000L, 0); + assertFalse(group.hasMetadataExpired(context.time.milliseconds())); + }); + + // Update the metadata image. + Uuid topicA = Uuid.randomUuid(); + Uuid topicB = Uuid.randomUuid(); + Uuid topicC = Uuid.randomUuid(); + Uuid topicD = Uuid.randomUuid(); + Uuid topicE = Uuid.randomUuid(); + + // Create a first base image with topic a, b, c and d. + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + delta.replay(new TopicRecord().setTopicId(topicA).setName("a")); + delta.replay(new PartitionRecord().setTopicId(topicA).setPartitionId(0)); + delta.replay(new TopicRecord().setTopicId(topicB).setName("b")); + delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(0)); + delta.replay(new TopicRecord().setTopicId(topicC).setName("c")); + delta.replay(new PartitionRecord().setTopicId(topicC).setPartitionId(0)); + delta.replay(new TopicRecord().setTopicId(topicD).setName("d")); + delta.replay(new PartitionRecord().setTopicId(topicD).setPartitionId(0)); + MetadataImage image = delta.apply(MetadataProvenance.EMPTY); + + // Create a delta which updates topic B, deletes topic D and creates topic E. + delta = new MetadataDelta(image); + delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(2)); + delta.replay(new RemoveTopicRecord().setTopicId(topicD)); + delta.replay(new TopicRecord().setTopicId(topicE).setName("e")); + delta.replay(new PartitionRecord().setTopicId(topicE).setPartitionId(1)); + image = delta.apply(MetadataProvenance.EMPTY); + + // Update metadata image with the delta. + context.groupMetadataManager.onNewMetadataImage(image, delta); + + // Verify the groups. + Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> { + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(group.hasMetadataExpired(context.time.milliseconds())); + }); + + Arrays.asList("group5").forEach(groupId -> { + ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false); + assertFalse(group.hasMetadataExpired(context.time.milliseconds())); + }); + + // Verify image. + assertEquals(image, context.groupMetadataManager.image()); + } + private void assertUnorderedListEquals( List expected, List actual diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java index 2454188ed94..7306e3a3c5d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java @@ -19,8 +19,9 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.coordinator.group.GroupMetadataManagerTest; -import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.image.MetadataImage; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.Test; @@ -392,7 +393,7 @@ public class ConsumerGroupTest { Uuid barTopicId = Uuid.randomUuid(); Uuid zarTopicId = Uuid.randomUuid(); - TopicsImage image = new GroupMetadataManagerTest.TopicsImageBuilder() + MetadataImage image = new GroupMetadataManagerTest.MetadataImageBuilder() .addTopic(fooTopicId, "foo", 1) .addTopic(barTopicId, "bar", 2) .addTopic(zarTopicId, "zar", 3) @@ -416,7 +417,7 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( null, null, - image + image.topics() ) ); @@ -428,7 +429,7 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( null, member1, - image + image.topics() ) ); @@ -443,7 +444,7 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( null, null, - image + image.topics() ) ); @@ -453,7 +454,7 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( member1, null, - image + image.topics() ) ); @@ -466,7 +467,7 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( null, member2, - image + image.topics() ) ); @@ -482,7 +483,7 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( null, null, - image + image.topics() ) ); @@ -494,7 +495,7 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( member2, null, - image + image.topics() ) ); @@ -506,7 +507,7 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( member1, null, - image + image.topics() ) ); @@ -520,7 +521,7 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( null, member3, - image + image.topics() ) ); @@ -537,8 +538,57 @@ public class ConsumerGroupTest { consumerGroup.computeSubscriptionMetadata( null, null, - image + image.topics() ) ); } + + @Test + public void testMetadataRefreshDeadline() { + MockTime time = new MockTime(); + ConsumerGroup group = createConsumerGroup("group-foo"); + + // Group epoch starts at 0. + assertEquals(0, group.groupEpoch()); + + // The refresh time deadline should be empty when the group is created or loaded. + assertTrue(group.hasMetadataExpired(time.milliseconds())); + assertEquals(0L, group.metadataRefreshDeadline().deadlineMs); + assertEquals(0, group.metadataRefreshDeadline().epoch); + + // Set the refresh deadline. The metadata remains valid because the deadline + // has not past and the group epoch is correct. + group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); + assertFalse(group.hasMetadataExpired(time.milliseconds())); + assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); + assertEquals(group.groupEpoch(), group.metadataRefreshDeadline().epoch); + + // Advance past the deadline. The metadata should have expired. + time.sleep(1001L); + assertTrue(group.hasMetadataExpired(time.milliseconds())); + + // Set the refresh time deadline with a higher group epoch. The metadata is considered + // as expired because the group epoch attached to the deadline is higher than the + // current group epoch. + group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1); + assertTrue(group.hasMetadataExpired(time.milliseconds())); + assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); + assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); + + // Advance the group epoch. + group.setGroupEpoch(group.groupEpoch() + 1); + + // Set the refresh deadline. The metadata remains valid because the deadline + // has not past and the group epoch is correct. + group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); + assertFalse(group.hasMetadataExpired(time.milliseconds())); + assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); + assertEquals(group.groupEpoch(), group.metadataRefreshDeadline().epoch); + + // Request metadata refresh. The metadata expires immediately. + group.requestMetadataRefresh(); + assertTrue(group.hasMetadataExpired(time.milliseconds())); + assertEquals(0L, group.metadataRefreshDeadline().deadlineMs); + assertEquals(0, group.metadataRefreshDeadline().epoch); + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index 7c40721d5ae..c130fa474ad 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -20,6 +20,9 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashSet; import org.junit.jupiter.api.Test; @@ -222,7 +225,7 @@ public class CoordinatorRuntimeTest { assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state); // Verify that onLoaded is called. - verify(coordinator, times(1)).onLoaded(); + verify(coordinator, times(1)).onLoaded(MetadataImage.EMPTY); // Verify that the listener is registered. verify(writer, times(1)).registerListener( @@ -834,4 +837,60 @@ public class CoordinatorRuntimeTest { // Verify that the loader was closed. verify(loader).close(); } + + @Test + public void testOnNewMetadataImage() { + TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); + TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); + + MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class); + MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withLoader(loader) + .withEventProcessor(new MockEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorBuilderSupplier(supplier) + .build(); + + MockCoordinator coordinator0 = mock(MockCoordinator.class); + MockCoordinator coordinator1 = mock(MockCoordinator.class); + + when(supplier.get()).thenReturn(builder); + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.build()) + .thenReturn(coordinator0) + .thenReturn(coordinator1); + + CompletableFuture future0 = new CompletableFuture<>(); + when(loader.load(tp0, coordinator0)).thenReturn(future0); + + CompletableFuture future1 = new CompletableFuture<>(); + when(loader.load(tp1, coordinator1)).thenReturn(future1); + + runtime.scheduleLoadOperation(tp0, 0); + runtime.scheduleLoadOperation(tp1, 0); + + // Coordinator 0 is loaded. It should get the current image + // that is the empty one. + future0.complete(null); + verify(coordinator0).onLoaded(MetadataImage.EMPTY); + + // Publish a new image. + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY); + runtime.onNewMetadataImage(newImage, delta); + + // Coordinator 0 should be notified about it. + verify(coordinator0).onNewMetadataImage(newImage, delta); + + // Coordinator 1 is loaded. It should get the current image + // that is the new image. + future1.complete(null); + verify(coordinator1).onLoaded(newImage); + } }