diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index d7e36890cdd..6e2a8c0ca4e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -323,7 +323,7 @@
+ files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
assignors = null;
- private TopicsImage topicsImage = null;
private int consumerGroupMaxSize = Integer.MAX_VALUE;
private int consumerGroupHeartbeatIntervalMs = 5000;
+ private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
+ private MetadataImage metadataImage = null;
Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
@@ -103,6 +110,11 @@ public class GroupMetadataManager {
return this;
}
+ Builder withTime(Time time) {
+ this.time = time;
+ return this;
+ }
+
Builder withAssignors(List assignors) {
this.assignors = assignors;
return this;
@@ -118,15 +130,21 @@ public class GroupMetadataManager {
return this;
}
- Builder withTopicsImage(TopicsImage topicsImage) {
- this.topicsImage = topicsImage;
+ Builder withConsumerGroupMetadataRefreshIntervalMs(int consumerGroupMetadataRefreshIntervalMs) {
+ this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
+ return this;
+ }
+
+ Builder withMetadataImage(MetadataImage metadataImage) {
+ this.metadataImage = metadataImage;
return this;
}
GroupMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
- if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+ if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
+ if (time == null) time = Time.SYSTEM;
if (assignors == null || assignors.isEmpty()) {
throw new IllegalStateException("Assignors must be set before building.");
@@ -135,10 +153,12 @@ public class GroupMetadataManager {
return new GroupMetadataManager(
snapshotRegistry,
logContext,
+ time,
assignors,
- topicsImage,
+ metadataImage,
consumerGroupMaxSize,
- consumerGroupHeartbeatIntervalMs
+ consumerGroupHeartbeatIntervalMs,
+ consumerGroupMetadataRefreshIntervalMs
);
}
}
@@ -153,6 +173,11 @@ public class GroupMetadataManager {
*/
private final SnapshotRegistry snapshotRegistry;
+ /**
+ * The system time.
+ */
+ private final Time time;
+
/**
* The supported partition assignors keyed by their name.
*/
@@ -168,6 +193,11 @@ public class GroupMetadataManager {
*/
private final TimelineHashMap groups;
+ /**
+ * The group ids keyed by topic names.
+ */
+ private final TimelineHashMap> groupsByTopics;
+
/**
* The maximum number of members allowed in a single consumer group.
*/
@@ -179,26 +209,43 @@ public class GroupMetadataManager {
private final int consumerGroupHeartbeatIntervalMs;
/**
- * The topics metadata (or image).
+ * The metadata refresh interval.
*/
- private TopicsImage topicsImage;
+ private final int consumerGroupMetadataRefreshIntervalMs;
+
+ /**
+ * The metadata image.
+ */
+ private MetadataImage metadataImage;
private GroupMetadataManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext,
+ Time time,
List assignors,
- TopicsImage topicsImage,
+ MetadataImage metadataImage,
int consumerGroupMaxSize,
- int consumerGroupHeartbeatIntervalMs
+ int consumerGroupHeartbeatIntervalMs,
+ int consumerGroupMetadataRefreshIntervalMs
) {
this.log = logContext.logger(GroupMetadataManager.class);
this.snapshotRegistry = snapshotRegistry;
- this.topicsImage = topicsImage;
+ this.time = time;
+ this.metadataImage = metadataImage;
this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
this.defaultAssignor = assignors.get(0);
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
this.consumerGroupMaxSize = consumerGroupMaxSize;
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
+ this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
+ }
+
+ /**
+ * @return The current metadata image used by the group metadata manager.
+ */
+ public MetadataImage image() {
+ return metadataImage;
}
/**
@@ -472,7 +519,8 @@ public class GroupMetadataManager {
String assignorName,
List ownedTopicPartitions
) throws ApiException {
- List records = new ArrayList<>();
+ final long currentTimeMs = time.milliseconds();
+ final List records = new ArrayList<>();
// Get or create the consumer group.
boolean createIfNotExists = memberEpoch == 0;
@@ -506,30 +554,47 @@ public class GroupMetadataManager {
.setClientHost(clientHost)
.build();
+ boolean bumpGroupEpoch = false;
if (!updatedMember.equals(member)) {
records.add(newMemberSubscriptionRecord(groupId, updatedMember));
if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " +
updatedMember.subscribedTopicNames());
+ bumpGroupEpoch = true;
+ }
- subscriptionMetadata = group.computeSubscriptionMetadata(
- member,
- updatedMember,
- topicsImage
- );
+ if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+ log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed regex to: " +
+ updatedMember.subscribedTopicRegex());
+ bumpGroupEpoch = true;
+ }
+ }
- if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
- log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
- + subscriptionMetadata + ".");
- records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
- }
+ if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
+ // The subscription metadata is updated in two cases:
+ // 1) The member has updated its subscriptions;
+ // 2) The refresh deadline has been reached.
+ subscriptionMetadata = group.computeSubscriptionMetadata(
+ member,
+ updatedMember,
+ metadataImage.topics()
+ );
+ if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+ log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+ + subscriptionMetadata + ".");
+ bumpGroupEpoch = true;
+ records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
+ }
+
+ if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newGroupEpochRecord(groupId, groupEpoch));
-
log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
}
+
+ group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch);
}
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The
@@ -635,7 +700,7 @@ public class GroupMetadataManager {
Map subscriptionMetadata = group.computeSubscriptionMetadata(
member,
null,
- topicsImage
+ metadataImage.topics()
);
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
@@ -709,14 +774,15 @@ public class GroupMetadataManager {
String groupId = key.groupId();
String memberId = key.memberId();
+ ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, value != null);
+ Set oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames());
+
if (value != null) {
- ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true);
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true);
consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember)
.updateWith(value)
.build());
} else {
- ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
if (oldMember.memberEpoch() != -1) {
throw new IllegalStateException("Received a tombstone record to delete member " + memberId
@@ -728,6 +794,81 @@ public class GroupMetadataManager {
}
consumerGroup.removeMember(memberId);
}
+
+ updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());
+ }
+
+ /**
+ * @return The set of groups subscribed to the topic.
+ */
+ public Set groupsSubscribedToTopic(String topicName) {
+ Set groups = groupsByTopics.get(topicName);
+ return groups != null ? groups : Collections.emptySet();
+ }
+
+ /**
+ * Subscribes a group to a topic.
+ *
+ * @param groupId The group id.
+ * @param topicName The topic name.
+ */
+ private void subscribeGroupToTopic(
+ String groupId,
+ String topicName
+ ) {
+ groupsByTopics
+ .computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
+ .add(groupId);
+ }
+
+ /**
+ * Unsubscribes a group from a topic.
+ *
+ * @param groupId The group id.
+ * @param topicName The topic name.
+ */
+ private void unsubscribeGroupFromTopic(
+ String groupId,
+ String topicName
+ ) {
+ groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> {
+ groupIds.remove(groupId);
+ return groupIds.isEmpty() ? null : groupIds;
+ });
+ }
+
+ /**
+ * Updates the group by topics mapping.
+ *
+ * @param groupId The group id.
+ * @param oldSubscribedTopics The old group subscriptions.
+ * @param newSubscribedTopics The new group subscriptions.
+ */
+ private void updateGroupsByTopics(
+ String groupId,
+ Set oldSubscribedTopics,
+ Set newSubscribedTopics
+ ) {
+ if (oldSubscribedTopics.isEmpty()) {
+ newSubscribedTopics.forEach(topicName ->
+ subscribeGroupToTopic(groupId, topicName)
+ );
+ } else if (newSubscribedTopics.isEmpty()) {
+ oldSubscribedTopics.forEach(topicName ->
+ unsubscribeGroupFromTopic(groupId, topicName)
+ );
+ } else {
+ oldSubscribedTopics.forEach(topicName -> {
+ if (!newSubscribedTopics.contains(topicName)) {
+ unsubscribeGroupFromTopic(groupId, topicName);
+ }
+ });
+ newSubscribedTopics.forEach(topicName -> {
+ if (!oldSubscribedTopics.contains(topicName)) {
+ subscribeGroupToTopic(groupId, topicName);
+ }
+ });
+ }
}
/**
@@ -874,4 +1015,32 @@ public class GroupMetadataManager {
consumerGroup.updateMember(newMember);
}
}
+
+ /**
+ * A new metadata image is available.
+ *
+ * @param newImage The new metadata image.
+ * @param delta The delta image.
+ */
+ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+ metadataImage = newImage;
+
+ // Notify all the groups subscribed to the created, updated or
+ // deleted topics.
+ Set allGroupIds = new HashSet<>();
+ delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+ String topicName = topicDelta.name();
+ allGroupIds.addAll(groupsSubscribedToTopic(topicName));
+ });
+ delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
+ TopicImage topicImage = delta.image().topics().getTopic(topicId);
+ allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
+ });
+ allGroupIds.forEach(groupId -> {
+ Group group = groups.get(groupId);
+ if (group != null && group.type() == Group.GroupType.CONSUMER) {
+ ((ConsumerGroup) group).requestMetadataRefresh();
+ }
+ });
+ }
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
index 89d08d450a2..201da405e6d 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/ReplicatedGroupCoordinator.java
@@ -36,6 +36,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.runtime.Coordinator;
import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -130,6 +132,28 @@ public class ReplicatedGroupCoordinator implements Coordinator {
return groupMetadataManager.consumerGroupHeartbeat(context, request);
}
+ /**
+ * The coordinator has been loaded. This is used to apply any
+ * post loading operations (e.g. registering timers).
+ *
+ * @param newImage The metadata image.
+ */
+ @Override
+ public void onLoaded(MetadataImage newImage) {
+ groupMetadataManager.onNewMetadataImage(newImage, new MetadataDelta(newImage));
+ }
+
+ /**
+ * A new metadata image is available.
+ *
+ * @param newImage The new metadata image.
+ * @param delta The delta image.
+ */
+ @Override
+ public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+ groupMetadataManager.onNewMetadataImage(newImage, delta);
+ }
+
/**
* @return The ApiMessage or null.
*/
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
index 9df00733b30..6f682d4e82f 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
@@ -58,6 +58,18 @@ public class ConsumerGroup implements Group {
}
}
+ public static class DeadlineAndEpoch {
+ static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
+
+ public final long deadlineMs;
+ public final int epoch;
+
+ DeadlineAndEpoch(long deadlineMs, int epoch) {
+ this.deadlineMs = deadlineMs;
+ this.epoch = epoch;
+ }
+ }
+
/**
* The snapshot registry.
*/
@@ -119,6 +131,18 @@ public class ConsumerGroup implements Group {
*/
private final TimelineHashMap> currentPartitionEpoch;
+ /**
+ * The metadata refresh deadline. It consists of a timestamp in milliseconds together with
+ * the group epoch at the time of setting it. The metadata refresh time is considered as a
+ * soft state (read that it is not stored in a timeline data structure). It is like this
+ * because it is not persisted to the log. The group epoch is here to ensure that the
+ * metadata refresh deadline is invalidated if the group epoch does not correspond to
+ * the current group epoch. This can happen if the metadata refresh deadline is updated
+ * after having refreshed the metadata but the write operation failed. In this case, the
+ * time is not automatically rolled back.
+ */
+ private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+
public ConsumerGroup(
SnapshotRegistry snapshotRegistry,
String groupId
@@ -249,8 +273,10 @@ public class ConsumerGroup implements Group {
* @param memberId The member id to remove.
*/
public void removeMember(String memberId) {
- ConsumerGroupMember member = members.remove(memberId);
- maybeRemovePartitionEpoch(member);
+ ConsumerGroupMember oldMember = members.remove(memberId);
+ maybeUpdateSubscribedTopicNames(oldMember, null);
+ maybeUpdateServerAssignors(oldMember, null);
+ maybeRemovePartitionEpoch(oldMember);
maybeUpdateGroupState();
}
@@ -279,6 +305,13 @@ public class ConsumerGroup implements Group {
return Collections.unmodifiableMap(members);
}
+ /**
+ * @return An immutable Set containing all the subscribed topic names.
+ */
+ public Set subscribedTopicNames() {
+ return Collections.unmodifiableSet(subscribedTopicNames.keySet());
+ }
+
/**
* Returns the target assignment of the member.
*
@@ -423,6 +456,47 @@ public class ConsumerGroup implements Group {
return Collections.unmodifiableMap(newSubscriptionMetadata);
}
+ /**
+ * Updates the metadata refresh deadline.
+ *
+ * @param deadlineMs The deadline in milliseconds.
+ * @param groupEpoch The associated group epoch.
+ */
+ public void setMetadataRefreshDeadline(
+ long deadlineMs,
+ int groupEpoch
+ ) {
+ this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, groupEpoch);
+ }
+
+ /**
+ * Requests a metadata refresh.
+ */
+ public void requestMetadataRefresh() {
+ this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+ }
+
+ /**
+ * Checks if a metadata refresh is required. A refresh is required in two cases:
+ * 1) The deadline is smaller or equal to the current time;
+ * 2) The group epoch associated with the deadline is larger than
+ * the current group epoch. This means that the operations which updated
+ * the deadline failed.
+ *
+ * @param currentTimeMs The current time in milliseconds.
+ * @return A boolean indicating whether a refresh is required or not.
+ */
+ public boolean hasMetadataExpired(long currentTimeMs) {
+ return currentTimeMs >= metadataRefreshDeadline.deadlineMs || groupEpoch() < metadataRefreshDeadline.epoch;
+ }
+
+ /**
+ * @return The metadata refresh deadline.
+ */
+ public DeadlineAndEpoch metadataRefreshDeadline() {
+ return metadataRefreshDeadline;
+ }
+
/**
* Updates the current state of the group.
*/
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
index 5ab88a0efa6..8189e1ab89b 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Coordinator.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.coordinator.group.runtime;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+
/**
* Coordinator is basically a replicated state machine managed by the
* {@link CoordinatorRuntime}.
@@ -25,8 +28,19 @@ public interface Coordinator extends CoordinatorPlayback {
/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).
+ *
+ * @param newImage The metadata image.
*/
- default void onLoaded() {}
+ default void onLoaded(MetadataImage newImage) {}
+
+ /**
+ * A new metadata image is available. This is only called after {@link Coordinator#onLoaded(MetadataImage)}
+ * is called to signal that the coordinator has been fully loaded.
+ *
+ * @param newImage The new metadata image.
+ * @param delta The delta image.
+ */
+ default void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {}
/**
* The coordinator has been unloaded. This is used to apply
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index a18964cc3bb..00c8937f436 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -24,6 +24,8 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
@@ -349,7 +351,7 @@ public class CoordinatorRuntime, U> implements AutoClos
state = CoordinatorState.ACTIVE;
snapshotRegistry.getOrCreateSnapshot(0);
partitionWriter.registerListener(tp, highWatermarklistener);
- coordinator.onLoaded();
+ coordinator.onLoaded(metadataImage);
break;
case FAILED:
@@ -807,6 +809,11 @@ public class CoordinatorRuntime, U> implements AutoClos
*/
private final AtomicBoolean isRunning = new AtomicBoolean(true);
+ /**
+ * The latest known metadata image.
+ */
+ private volatile MetadataImage metadataImage = MetadataImage.EMPTY;
+
/**
* Constructor.
*
@@ -1083,6 +1090,37 @@ public class CoordinatorRuntime, U> implements AutoClos
});
}
+ /**
+ * A new metadata image is available.
+ *
+ * @param newImage The new metadata image.
+ * @param delta The metadata delta.
+ */
+ public void onNewMetadataImage(
+ MetadataImage newImage,
+ MetadataDelta delta
+ ) {
+ throwIfNotRunning();
+ log.debug("Scheduling applying of a new metadata image with offset {}.", newImage.offset());
+
+ // Update global image.
+ metadataImage = newImage;
+
+ // Push an event for each coordinator.
+ coordinators.keySet().forEach(tp -> {
+ scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" + newImage.offset() + ")", tp, () -> {
+ CoordinatorContext context = contextOrThrow(tp);
+ if (context.state == CoordinatorState.ACTIVE) {
+ log.debug("Applying new metadata image with offset {} to {}.", newImage.offset(), tp);
+ context.coordinator.onNewMetadataImage(newImage, delta);
+ } else {
+ log.debug("Ignoring new metadata image with offset {} for {} because the coordinator is not active.",
+ newImage.offset(), tp);
+ }
+ });
+ });
+ }
+
/**
* Closes the runtime. This closes all the coordinators currently registered
* in the runtime.
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 2296d58ecd7..41c2b1cb8be 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
@@ -37,6 +38,8 @@ import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
@@ -59,8 +62,10 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicImage;
-import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -78,13 +83,16 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -113,10 +121,10 @@ public class GroupMetadataManagerTest {
}
}
- public static class TopicsImageBuilder {
- private TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
+ public static class MetadataImageBuilder {
+ private MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
- public TopicsImageBuilder addTopic(
+ public MetadataImageBuilder addTopic(
Uuid topicId,
String topicName,
int numPartitions
@@ -130,8 +138,8 @@ public class GroupMetadataManagerTest {
return this;
}
- public TopicsImage build() {
- return delta.apply();
+ public MetadataImage build() {
+ return delta.apply(MetadataProvenance.EMPTY);
}
}
@@ -141,6 +149,7 @@ public class GroupMetadataManagerTest {
private int assignmentEpoch;
private final Map members = new HashMap<>();
private final Map assignments = new HashMap<>();
+ private Map subscriptionMetadata;
public ConsumerGroupBuilder(String groupId, int groupEpoch) {
this.groupId = groupId;
@@ -153,6 +162,11 @@ public class GroupMetadataManagerTest {
return this;
}
+ public ConsumerGroupBuilder withSubscriptionMetadata(Map subscriptionMetadata) {
+ this.subscriptionMetadata = subscriptionMetadata;
+ return this;
+ }
+
public ConsumerGroupBuilder withAssignment(String memberId, Map> assignment) {
this.assignments.put(memberId, new Assignment(assignment));
return this;
@@ -172,19 +186,21 @@ public class GroupMetadataManagerTest {
});
// Add subscription metadata.
- Map subscriptionMetadata = new HashMap<>();
- members.forEach((memberId, member) -> {
- member.subscribedTopicNames().forEach(topicName -> {
- TopicImage topicImage = topicsImage.getTopic(topicName);
- if (topicImage != null) {
- subscriptionMetadata.put(topicName, new TopicMetadata(
- topicImage.id(),
- topicImage.name(),
- topicImage.partitions().size()
- ));
- }
+ if (subscriptionMetadata == null) {
+ subscriptionMetadata = new HashMap<>();
+ members.forEach((memberId, member) -> {
+ member.subscribedTopicNames().forEach(topicName -> {
+ TopicImage topicImage = topicsImage.getTopic(topicName);
+ if (topicImage != null) {
+ subscriptionMetadata.put(topicName, new TopicMetadata(
+ topicImage.id(),
+ topicImage.name(),
+ topicImage.partitions().size()
+ ));
+ }
+ });
});
- });
+ }
if (!subscriptionMetadata.isEmpty()) {
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
@@ -212,15 +228,17 @@ public class GroupMetadataManagerTest {
static class GroupMetadataManagerTestContext {
static class Builder {
+ final private Time time = new MockTime();
final private LogContext logContext = new LogContext();
final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
- private TopicsImage topicsImage;
+ private MetadataImage metadataImage;
private List assignors;
private List consumerGroupBuilders = new ArrayList<>();
private int consumerGroupMaxSize = Integer.MAX_VALUE;
+ private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
- public Builder withTopicsImage(TopicsImage topicsImage) {
- this.topicsImage = topicsImage;
+ public Builder withMetadataImage(MetadataImage metadataImage) {
+ this.metadataImage = metadataImage;
return this;
}
@@ -239,24 +257,32 @@ public class GroupMetadataManagerTest {
return this;
}
+ public Builder withConsumerGroupMetadataRefreshIntervalMs(int consumerGroupMetadataRefreshIntervalMs) {
+ this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
+ return this;
+ }
+
public GroupMetadataManagerTestContext build() {
- if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
+ if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (assignors == null) assignors = Collections.emptyList();
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext(
+ time,
snapshotRegistry,
new GroupMetadataManager.Builder()
.withSnapshotRegistry(snapshotRegistry)
.withLogContext(logContext)
- .withTopicsImage(topicsImage)
+ .withTime(time)
+ .withMetadataImage(metadataImage)
.withConsumerGroupHeartbeatInterval(5000)
.withConsumerGroupMaxSize(consumerGroupMaxSize)
.withAssignors(assignors)
+ .withConsumerGroupMetadataRefreshIntervalMs(consumerGroupMetadataRefreshIntervalMs)
.build()
);
consumerGroupBuilders.forEach(builder -> {
- builder.build(topicsImage).forEach(context::replay);
+ builder.build(metadataImage.topics()).forEach(context::replay);
});
context.commit();
@@ -265,6 +291,7 @@ public class GroupMetadataManagerTest {
}
}
+ final Time time;
final SnapshotRegistry snapshotRegistry;
final GroupMetadataManager groupMetadataManager;
@@ -272,9 +299,11 @@ public class GroupMetadataManagerTest {
long lastWrittenOffset = 0L;
public GroupMetadataManagerTestContext(
+ Time time,
SnapshotRegistry snapshotRegistry,
GroupMetadataManager groupMetadataManager
) {
+ this.time = time;
this.snapshotRegistry = snapshotRegistry;
this.groupMetadataManager = groupMetadataManager;
}
@@ -486,7 +515,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(TopicsImage.EMPTY)
+ .withMetadataImage(MetadataImage.EMPTY)
.build();
assignor.prepareGroupAssignment(new GroupAssignment(
@@ -675,7 +704,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -764,7 +793,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -865,7 +894,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -1009,7 +1038,7 @@ public class GroupMetadataManagerTest {
// Consumer group with two members.
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.addTopic(zarTopicId, zarTopicName, 1)
@@ -1100,7 +1129,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -1546,7 +1575,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.build())
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
@@ -1794,7 +1823,7 @@ public class GroupMetadataManagerTest {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -1912,7 +1941,7 @@ public class GroupMetadataManagerTest {
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withAssignors(Collections.singletonList(assignor))
- .withTopicsImage(new TopicsImageBuilder()
+ .withMetadataImage(new MetadataImageBuilder()
.addTopic(fooTopicId, fooTopicName, 6)
.addTopic(barTopicId, barTopicName, 3)
.build())
@@ -1932,6 +1961,435 @@ public class GroupMetadataManagerTest {
.setTopicPartitions(Collections.emptyList())));
}
+ @Test
+ public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ // Create a context with one consumer group containing one member.
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10)
+ .withSubscriptionMetadata(new HashMap() {
+ {
+ // foo only has 3 partitions stored in the metadata but foo has
+ // 6 partitions the metadata image.
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
+ }
+ }))
+ .build();
+
+ // The metadata refresh flag should be true.
+ ConsumerGroup consumerGroup = context.groupMetadataManager
+ .getOrMaybeCreateConsumerGroup(groupId, false);
+ assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+ // Prepare the assignment result.
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ // Heartbeat.
+ CoordinatorResult result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10));
+
+ // The member gets partitions 3, 4 and 5 assigned.
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setAssignedTopicPartitions(Arrays.asList(
+ new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+ ))),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setTargetMemberEpoch(11)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ List expectedRecords = Arrays.asList(
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+ RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+
+ // Check next refresh time.
+ assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+ assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
+ assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+ }
+
+ @Test
+ public void testSubscriptionMetadataRefreshedAgainAfterWriteFailure() {
+ String groupId = "fooup";
+ // Use a static member id as it makes the test easier.
+ String memberId = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ // Create a context with one consumer group containing one member.
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setTargetMemberEpoch(10)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withAssignment(memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10)
+ .withSubscriptionMetadata(new HashMap() {
+ {
+ // foo only has 3 partitions stored in the metadata but foo has
+ // 6 partitions the metadata image.
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
+ }
+ }))
+ .build();
+
+ // The metadata refresh flag should be true.
+ ConsumerGroup consumerGroup = context.groupMetadataManager
+ .getOrMaybeCreateConsumerGroup(groupId, false);
+ assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+ // Prepare the assignment result.
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )))
+ ));
+
+ // Heartbeat.
+ context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10));
+
+ // The metadata refresh flag is set to a future time.
+ assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+ assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
+ assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+
+ // Rollback the uncommitted changes. This does not rollback the metadata flag
+ // because it is not using a timeline data structure.
+ context.rollback();
+
+ // However, the next heartbeat should detect the divergence based on the epoch and trigger
+ // a metadata refresh.
+ CoordinatorResult result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10));
+
+
+ // The member gets partitions 3, 4 and 5 assigned.
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setAssignedTopicPartitions(Arrays.asList(
+ new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+ ))),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setTargetMemberEpoch(11)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ List expectedRecords = Arrays.asList(
+ RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap() {
+ {
+ put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
+ }
+ }),
+ RecordHelpers.newGroupEpochRecord(groupId, 11),
+ RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+ RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+ RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+
+ // Check next refresh time.
+ assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+ assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
+ assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+ }
+
+ @Test
+ public void testGroupIdsByTopics() {
+ String groupId1 = "group1";
+ String groupId2 = "group2";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(assignor))
+ .build();
+
+ assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M1 in group 1 subscribes to foo and bar.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+ new ConsumerGroupMember.Builder("group1-m1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .build()));
+
+ assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M1 in group 2 subscribes to foo, bar and zar.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+ new ConsumerGroupMember.Builder("group2-m1")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+ .build()));
+
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M2 in group 1 subscribes to bar and zar.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+ new ConsumerGroupMember.Builder("group1-m2")
+ .setSubscribedTopicNames(Arrays.asList("bar", "zar"))
+ .build()));
+
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M2 in group 2 subscribes to foo and bar.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+ new ConsumerGroupMember.Builder("group2-m2")
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .build()));
+
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M1 in group 1 is removed.
+ context.replay(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId1, "group1-m1"));
+ context.replay(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId1, "group1-m1"));
+
+ assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M1 in group 2 subscribes to nothing.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+ new ConsumerGroupMember.Builder("group2-m1")
+ .setSubscribedTopicNames(Collections.emptyList())
+ .build()));
+
+ assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M2 in group 2 subscribes to foo.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+ new ConsumerGroupMember.Builder("group2-m2")
+ .setSubscribedTopicNames(Arrays.asList("foo"))
+ .build()));
+
+ assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M2 in group 2 subscribes to nothing.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+ new ConsumerGroupMember.Builder("group2-m2")
+ .setSubscribedTopicNames(Collections.emptyList())
+ .build()));
+
+ assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+ // M2 in group 1 subscribes to nothing.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+ new ConsumerGroupMember.Builder("group1-m2")
+ .setSubscribedTopicNames(Collections.emptyList())
+ .build()));
+
+ assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+ assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+ assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+ }
+
+ @Test
+ public void testOnNewMetadataImage() {
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .withAssignors(Collections.singletonList(new MockPartitionAssignor("range")))
+ .build();
+
+ // M1 in group 1 subscribes to a and b.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord("group1",
+ new ConsumerGroupMember.Builder("group1-m1")
+ .setSubscribedTopicNames(Arrays.asList("a", "b"))
+ .build()));
+
+ // M1 in group 2 subscribes to b and c.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord("group2",
+ new ConsumerGroupMember.Builder("group2-m1")
+ .setSubscribedTopicNames(Arrays.asList("b", "c"))
+ .build()));
+
+ // M1 in group 3 subscribes to d.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord("group3",
+ new ConsumerGroupMember.Builder("group3-m1")
+ .setSubscribedTopicNames(Arrays.asList("d"))
+ .build()));
+
+ // M1 in group 4 subscribes to e.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord("group4",
+ new ConsumerGroupMember.Builder("group4-m1")
+ .setSubscribedTopicNames(Arrays.asList("e"))
+ .build()));
+
+ // M1 in group 5 subscribes to f.
+ context.replay(RecordHelpers.newMemberSubscriptionRecord("group5",
+ new ConsumerGroupMember.Builder("group5-m1")
+ .setSubscribedTopicNames(Arrays.asList("f"))
+ .build()));
+
+ // Ensures that all refresh flags are set to the future.
+ Arrays.asList("group1", "group2", "group3", "group4", "group5").forEach(groupId -> {
+ ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ group.setMetadataRefreshDeadline(context.time.milliseconds() + 5000L, 0);
+ assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
+ });
+
+ // Update the metadata image.
+ Uuid topicA = Uuid.randomUuid();
+ Uuid topicB = Uuid.randomUuid();
+ Uuid topicC = Uuid.randomUuid();
+ Uuid topicD = Uuid.randomUuid();
+ Uuid topicE = Uuid.randomUuid();
+
+ // Create a first base image with topic a, b, c and d.
+ MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ delta.replay(new TopicRecord().setTopicId(topicA).setName("a"));
+ delta.replay(new PartitionRecord().setTopicId(topicA).setPartitionId(0));
+ delta.replay(new TopicRecord().setTopicId(topicB).setName("b"));
+ delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(0));
+ delta.replay(new TopicRecord().setTopicId(topicC).setName("c"));
+ delta.replay(new PartitionRecord().setTopicId(topicC).setPartitionId(0));
+ delta.replay(new TopicRecord().setTopicId(topicD).setName("d"));
+ delta.replay(new PartitionRecord().setTopicId(topicD).setPartitionId(0));
+ MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
+
+ // Create a delta which updates topic B, deletes topic D and creates topic E.
+ delta = new MetadataDelta(image);
+ delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(2));
+ delta.replay(new RemoveTopicRecord().setTopicId(topicD));
+ delta.replay(new TopicRecord().setTopicId(topicE).setName("e"));
+ delta.replay(new PartitionRecord().setTopicId(topicE).setPartitionId(1));
+ image = delta.apply(MetadataProvenance.EMPTY);
+
+ // Update metadata image with the delta.
+ context.groupMetadataManager.onNewMetadataImage(image, delta);
+
+ // Verify the groups.
+ Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> {
+ ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
+ });
+
+ Arrays.asList("group5").forEach(groupId -> {
+ ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+ assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
+ });
+
+ // Verify image.
+ assertEquals(image, context.groupMetadataManager.image());
+ }
+
private void assertUnorderedListEquals(
List expected,
List actual
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
index 2454188ed94..7306e3a3c5d 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
@@ -19,8 +19,9 @@ package org.apache.kafka.coordinator.group.consumer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
-import org.apache.kafka.image.TopicsImage;
+import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
@@ -392,7 +393,7 @@ public class ConsumerGroupTest {
Uuid barTopicId = Uuid.randomUuid();
Uuid zarTopicId = Uuid.randomUuid();
- TopicsImage image = new GroupMetadataManagerTest.TopicsImageBuilder()
+ MetadataImage image = new GroupMetadataManagerTest.MetadataImageBuilder()
.addTopic(fooTopicId, "foo", 1)
.addTopic(barTopicId, "bar", 2)
.addTopic(zarTopicId, "zar", 3)
@@ -416,7 +417,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
null,
- image
+ image.topics()
)
);
@@ -428,7 +429,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
member1,
- image
+ image.topics()
)
);
@@ -443,7 +444,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
null,
- image
+ image.topics()
)
);
@@ -453,7 +454,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
member1,
null,
- image
+ image.topics()
)
);
@@ -466,7 +467,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
member2,
- image
+ image.topics()
)
);
@@ -482,7 +483,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
null,
- image
+ image.topics()
)
);
@@ -494,7 +495,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
member2,
null,
- image
+ image.topics()
)
);
@@ -506,7 +507,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
member1,
null,
- image
+ image.topics()
)
);
@@ -520,7 +521,7 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
member3,
- image
+ image.topics()
)
);
@@ -537,8 +538,57 @@ public class ConsumerGroupTest {
consumerGroup.computeSubscriptionMetadata(
null,
null,
- image
+ image.topics()
)
);
}
+
+ @Test
+ public void testMetadataRefreshDeadline() {
+ MockTime time = new MockTime();
+ ConsumerGroup group = createConsumerGroup("group-foo");
+
+ // Group epoch starts at 0.
+ assertEquals(0, group.groupEpoch());
+
+ // The refresh time deadline should be empty when the group is created or loaded.
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(0, group.metadataRefreshDeadline().epoch);
+
+ // Set the refresh deadline. The metadata remains valid because the deadline
+ // has not past and the group epoch is correct.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());
+ assertFalse(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch(), group.metadataRefreshDeadline().epoch);
+
+ // Advance past the deadline. The metadata should have expired.
+ time.sleep(1001L);
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+
+ // Set the refresh time deadline with a higher group epoch. The metadata is considered
+ // as expired because the group epoch attached to the deadline is higher than the
+ // current group epoch.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1);
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+ // Advance the group epoch.
+ group.setGroupEpoch(group.groupEpoch() + 1);
+
+ // Set the refresh deadline. The metadata remains valid because the deadline
+ // has not past and the group epoch is correct.
+ group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());
+ assertFalse(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(group.groupEpoch(), group.metadataRefreshDeadline().epoch);
+
+ // Request metadata refresh. The metadata expires immediately.
+ group.requestMetadataRefresh();
+ assertTrue(group.hasMetadataExpired(time.milliseconds()));
+ assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
+ assertEquals(0, group.metadataRefreshDeadline().epoch);
+ }
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index 7c40721d5ae..c130fa474ad 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -20,6 +20,9 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashSet;
import org.junit.jupiter.api.Test;
@@ -222,7 +225,7 @@ public class CoordinatorRuntimeTest {
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
// Verify that onLoaded is called.
- verify(coordinator, times(1)).onLoaded();
+ verify(coordinator, times(1)).onLoaded(MetadataImage.EMPTY);
// Verify that the listener is registered.
verify(writer, times(1)).registerListener(
@@ -834,4 +837,60 @@ public class CoordinatorRuntimeTest {
// Verify that the loader was closed.
verify(loader).close();
}
+
+ @Test
+ public void testOnNewMetadataImage() {
+ TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+ TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+ MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+ MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
+ MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+ CoordinatorRuntime runtime =
+ new CoordinatorRuntime.Builder()
+ .withLoader(loader)
+ .withEventProcessor(new MockEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorBuilderSupplier(supplier)
+ .build();
+
+ MockCoordinator coordinator0 = mock(MockCoordinator.class);
+ MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+ when(supplier.get()).thenReturn(builder);
+ when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+ when(builder.withLogContext(any())).thenReturn(builder);
+ when(builder.build())
+ .thenReturn(coordinator0)
+ .thenReturn(coordinator1);
+
+ CompletableFuture future0 = new CompletableFuture<>();
+ when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+ CompletableFuture future1 = new CompletableFuture<>();
+ when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+ runtime.scheduleLoadOperation(tp0, 0);
+ runtime.scheduleLoadOperation(tp1, 0);
+
+ // Coordinator 0 is loaded. It should get the current image
+ // that is the empty one.
+ future0.complete(null);
+ verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+ // Publish a new image.
+ MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+ MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+ runtime.onNewMetadataImage(newImage, delta);
+
+ // Coordinator 0 should be notified about it.
+ verify(coordinator0).onNewMetadataImage(newImage, delta);
+
+ // Coordinator 1 is loaded. It should get the current image
+ // that is the new image.
+ future1.complete(null);
+ verify(coordinator1).onLoaded(newImage);
+ }
}