mirror of https://github.com/apache/kafka.git
KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image (#13901)
This patch adds (1) the logic to propagate a new MetadataImage to the running coordinators; and (2) the logic to ensure that all the consumer groups subscribed to topics with changes will refresh their subscriptions metadata on the next heartbeat. In the mean time, it ensures that freshly loaded consumer groups also refresh their subscriptions metadata on the next heartbeat. Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This commit is contained in:
parent
701f924352
commit
98fbd8afc7
|
@ -323,7 +323,7 @@
|
||||||
<suppress checks="CyclomaticComplexity"
|
<suppress checks="CyclomaticComplexity"
|
||||||
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
|
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
|
||||||
<suppress checks="MethodLength"
|
<suppress checks="MethodLength"
|
||||||
files="(ConsumerGroupTest|GroupMetadataManagerTest).java"/>
|
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
|
||||||
<suppress checks="NPathComplexity"
|
<suppress checks="NPathComplexity"
|
||||||
files="(GroupMetadataManager).java"/>
|
files="(GroupMetadataManager).java"/>
|
||||||
<suppress checks="ParameterNumber"
|
<suppress checks="ParameterNumber"
|
||||||
|
|
|
@ -531,6 +531,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
|
||||||
MetadataDelta delta
|
MetadataDelta delta
|
||||||
) {
|
) {
|
||||||
throwIfNotActive();
|
throwIfNotActive();
|
||||||
|
runtime.onNewMetadataImage(newImage, delta);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
|
||||||
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
|
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
|
||||||
import org.apache.kafka.common.requests.RequestContext;
|
import org.apache.kafka.common.requests.RequestContext;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
|
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
|
||||||
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
|
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
|
||||||
import org.apache.kafka.coordinator.group.consumer.Assignment;
|
import org.apache.kafka.coordinator.group.consumer.Assignment;
|
||||||
|
@ -50,14 +51,18 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
|
||||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
|
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
|
||||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
|
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
|
||||||
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
|
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
|
||||||
import org.apache.kafka.image.TopicsImage;
|
import org.apache.kafka.image.MetadataDelta;
|
||||||
|
import org.apache.kafka.image.MetadataImage;
|
||||||
|
import org.apache.kafka.image.TopicImage;
|
||||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||||
import org.apache.kafka.timeline.TimelineHashMap;
|
import org.apache.kafka.timeline.TimelineHashMap;
|
||||||
|
import org.apache.kafka.timeline.TimelineHashSet;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
@ -88,10 +93,12 @@ public class GroupMetadataManager {
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
private LogContext logContext = null;
|
private LogContext logContext = null;
|
||||||
private SnapshotRegistry snapshotRegistry = null;
|
private SnapshotRegistry snapshotRegistry = null;
|
||||||
|
private Time time = null;
|
||||||
private List<PartitionAssignor> assignors = null;
|
private List<PartitionAssignor> assignors = null;
|
||||||
private TopicsImage topicsImage = null;
|
|
||||||
private int consumerGroupMaxSize = Integer.MAX_VALUE;
|
private int consumerGroupMaxSize = Integer.MAX_VALUE;
|
||||||
private int consumerGroupHeartbeatIntervalMs = 5000;
|
private int consumerGroupHeartbeatIntervalMs = 5000;
|
||||||
|
private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
|
||||||
|
private MetadataImage metadataImage = null;
|
||||||
|
|
||||||
Builder withLogContext(LogContext logContext) {
|
Builder withLogContext(LogContext logContext) {
|
||||||
this.logContext = logContext;
|
this.logContext = logContext;
|
||||||
|
@ -103,6 +110,11 @@ public class GroupMetadataManager {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Builder withTime(Time time) {
|
||||||
|
this.time = time;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
Builder withAssignors(List<PartitionAssignor> assignors) {
|
Builder withAssignors(List<PartitionAssignor> assignors) {
|
||||||
this.assignors = assignors;
|
this.assignors = assignors;
|
||||||
return this;
|
return this;
|
||||||
|
@ -118,15 +130,21 @@ public class GroupMetadataManager {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
Builder withTopicsImage(TopicsImage topicsImage) {
|
Builder withConsumerGroupMetadataRefreshIntervalMs(int consumerGroupMetadataRefreshIntervalMs) {
|
||||||
this.topicsImage = topicsImage;
|
this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
Builder withMetadataImage(MetadataImage metadataImage) {
|
||||||
|
this.metadataImage = metadataImage;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
GroupMetadataManager build() {
|
GroupMetadataManager build() {
|
||||||
if (logContext == null) logContext = new LogContext();
|
if (logContext == null) logContext = new LogContext();
|
||||||
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
|
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
|
||||||
if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
|
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
|
||||||
|
if (time == null) time = Time.SYSTEM;
|
||||||
|
|
||||||
if (assignors == null || assignors.isEmpty()) {
|
if (assignors == null || assignors.isEmpty()) {
|
||||||
throw new IllegalStateException("Assignors must be set before building.");
|
throw new IllegalStateException("Assignors must be set before building.");
|
||||||
|
@ -135,10 +153,12 @@ public class GroupMetadataManager {
|
||||||
return new GroupMetadataManager(
|
return new GroupMetadataManager(
|
||||||
snapshotRegistry,
|
snapshotRegistry,
|
||||||
logContext,
|
logContext,
|
||||||
|
time,
|
||||||
assignors,
|
assignors,
|
||||||
topicsImage,
|
metadataImage,
|
||||||
consumerGroupMaxSize,
|
consumerGroupMaxSize,
|
||||||
consumerGroupHeartbeatIntervalMs
|
consumerGroupHeartbeatIntervalMs,
|
||||||
|
consumerGroupMetadataRefreshIntervalMs
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -153,6 +173,11 @@ public class GroupMetadataManager {
|
||||||
*/
|
*/
|
||||||
private final SnapshotRegistry snapshotRegistry;
|
private final SnapshotRegistry snapshotRegistry;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The system time.
|
||||||
|
*/
|
||||||
|
private final Time time;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The supported partition assignors keyed by their name.
|
* The supported partition assignors keyed by their name.
|
||||||
*/
|
*/
|
||||||
|
@ -168,6 +193,11 @@ public class GroupMetadataManager {
|
||||||
*/
|
*/
|
||||||
private final TimelineHashMap<String, Group> groups;
|
private final TimelineHashMap<String, Group> groups;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The group ids keyed by topic names.
|
||||||
|
*/
|
||||||
|
private final TimelineHashMap<String, TimelineHashSet<String>> groupsByTopics;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum number of members allowed in a single consumer group.
|
* The maximum number of members allowed in a single consumer group.
|
||||||
*/
|
*/
|
||||||
|
@ -179,26 +209,43 @@ public class GroupMetadataManager {
|
||||||
private final int consumerGroupHeartbeatIntervalMs;
|
private final int consumerGroupHeartbeatIntervalMs;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The topics metadata (or image).
|
* The metadata refresh interval.
|
||||||
*/
|
*/
|
||||||
private TopicsImage topicsImage;
|
private final int consumerGroupMetadataRefreshIntervalMs;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The metadata image.
|
||||||
|
*/
|
||||||
|
private MetadataImage metadataImage;
|
||||||
|
|
||||||
private GroupMetadataManager(
|
private GroupMetadataManager(
|
||||||
SnapshotRegistry snapshotRegistry,
|
SnapshotRegistry snapshotRegistry,
|
||||||
LogContext logContext,
|
LogContext logContext,
|
||||||
|
Time time,
|
||||||
List<PartitionAssignor> assignors,
|
List<PartitionAssignor> assignors,
|
||||||
TopicsImage topicsImage,
|
MetadataImage metadataImage,
|
||||||
int consumerGroupMaxSize,
|
int consumerGroupMaxSize,
|
||||||
int consumerGroupHeartbeatIntervalMs
|
int consumerGroupHeartbeatIntervalMs,
|
||||||
|
int consumerGroupMetadataRefreshIntervalMs
|
||||||
) {
|
) {
|
||||||
this.log = logContext.logger(GroupMetadataManager.class);
|
this.log = logContext.logger(GroupMetadataManager.class);
|
||||||
this.snapshotRegistry = snapshotRegistry;
|
this.snapshotRegistry = snapshotRegistry;
|
||||||
this.topicsImage = topicsImage;
|
this.time = time;
|
||||||
|
this.metadataImage = metadataImage;
|
||||||
this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
|
this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
|
||||||
this.defaultAssignor = assignors.get(0);
|
this.defaultAssignor = assignors.get(0);
|
||||||
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
|
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||||
|
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
|
||||||
this.consumerGroupMaxSize = consumerGroupMaxSize;
|
this.consumerGroupMaxSize = consumerGroupMaxSize;
|
||||||
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
|
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
|
||||||
|
this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The current metadata image used by the group metadata manager.
|
||||||
|
*/
|
||||||
|
public MetadataImage image() {
|
||||||
|
return metadataImage;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -472,7 +519,8 @@ public class GroupMetadataManager {
|
||||||
String assignorName,
|
String assignorName,
|
||||||
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
|
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
|
||||||
) throws ApiException {
|
) throws ApiException {
|
||||||
List<Record> records = new ArrayList<>();
|
final long currentTimeMs = time.milliseconds();
|
||||||
|
final List<Record> records = new ArrayList<>();
|
||||||
|
|
||||||
// Get or create the consumer group.
|
// Get or create the consumer group.
|
||||||
boolean createIfNotExists = memberEpoch == 0;
|
boolean createIfNotExists = memberEpoch == 0;
|
||||||
|
@ -506,30 +554,47 @@ public class GroupMetadataManager {
|
||||||
.setClientHost(clientHost)
|
.setClientHost(clientHost)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
boolean bumpGroupEpoch = false;
|
||||||
if (!updatedMember.equals(member)) {
|
if (!updatedMember.equals(member)) {
|
||||||
records.add(newMemberSubscriptionRecord(groupId, updatedMember));
|
records.add(newMemberSubscriptionRecord(groupId, updatedMember));
|
||||||
|
|
||||||
if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
|
if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
|
||||||
log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " +
|
log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " +
|
||||||
updatedMember.subscribedTopicNames());
|
updatedMember.subscribedTopicNames());
|
||||||
|
bumpGroupEpoch = true;
|
||||||
|
}
|
||||||
|
|
||||||
subscriptionMetadata = group.computeSubscriptionMetadata(
|
if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
|
||||||
member,
|
log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed regex to: " +
|
||||||
updatedMember,
|
updatedMember.subscribedTopicRegex());
|
||||||
topicsImage
|
bumpGroupEpoch = true;
|
||||||
);
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
|
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
|
||||||
log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
|
// The subscription metadata is updated in two cases:
|
||||||
+ subscriptionMetadata + ".");
|
// 1) The member has updated its subscriptions;
|
||||||
records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
|
// 2) The refresh deadline has been reached.
|
||||||
}
|
subscriptionMetadata = group.computeSubscriptionMetadata(
|
||||||
|
member,
|
||||||
|
updatedMember,
|
||||||
|
metadataImage.topics()
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
|
||||||
|
log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
|
||||||
|
+ subscriptionMetadata + ".");
|
||||||
|
bumpGroupEpoch = true;
|
||||||
|
records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (bumpGroupEpoch) {
|
||||||
groupEpoch += 1;
|
groupEpoch += 1;
|
||||||
records.add(newGroupEpochRecord(groupId, groupEpoch));
|
records.add(newGroupEpochRecord(groupId, groupEpoch));
|
||||||
|
|
||||||
log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
|
log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The
|
// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The
|
||||||
|
@ -635,7 +700,7 @@ public class GroupMetadataManager {
|
||||||
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
|
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
|
||||||
member,
|
member,
|
||||||
null,
|
null,
|
||||||
topicsImage
|
metadataImage.topics()
|
||||||
);
|
);
|
||||||
|
|
||||||
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
|
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
|
||||||
|
@ -709,14 +774,15 @@ public class GroupMetadataManager {
|
||||||
String groupId = key.groupId();
|
String groupId = key.groupId();
|
||||||
String memberId = key.memberId();
|
String memberId = key.memberId();
|
||||||
|
|
||||||
|
ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, value != null);
|
||||||
|
Set<String> oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames());
|
||||||
|
|
||||||
if (value != null) {
|
if (value != null) {
|
||||||
ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true);
|
|
||||||
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true);
|
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true);
|
||||||
consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember)
|
consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember)
|
||||||
.updateWith(value)
|
.updateWith(value)
|
||||||
.build());
|
.build());
|
||||||
} else {
|
} else {
|
||||||
ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
|
|
||||||
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
|
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
|
||||||
if (oldMember.memberEpoch() != -1) {
|
if (oldMember.memberEpoch() != -1) {
|
||||||
throw new IllegalStateException("Received a tombstone record to delete member " + memberId
|
throw new IllegalStateException("Received a tombstone record to delete member " + memberId
|
||||||
|
@ -728,6 +794,81 @@ public class GroupMetadataManager {
|
||||||
}
|
}
|
||||||
consumerGroup.removeMember(memberId);
|
consumerGroup.removeMember(memberId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The set of groups subscribed to the topic.
|
||||||
|
*/
|
||||||
|
public Set<String> groupsSubscribedToTopic(String topicName) {
|
||||||
|
Set<String> groups = groupsByTopics.get(topicName);
|
||||||
|
return groups != null ? groups : Collections.emptySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribes a group to a topic.
|
||||||
|
*
|
||||||
|
* @param groupId The group id.
|
||||||
|
* @param topicName The topic name.
|
||||||
|
*/
|
||||||
|
private void subscribeGroupToTopic(
|
||||||
|
String groupId,
|
||||||
|
String topicName
|
||||||
|
) {
|
||||||
|
groupsByTopics
|
||||||
|
.computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
|
||||||
|
.add(groupId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unsubscribes a group from a topic.
|
||||||
|
*
|
||||||
|
* @param groupId The group id.
|
||||||
|
* @param topicName The topic name.
|
||||||
|
*/
|
||||||
|
private void unsubscribeGroupFromTopic(
|
||||||
|
String groupId,
|
||||||
|
String topicName
|
||||||
|
) {
|
||||||
|
groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> {
|
||||||
|
groupIds.remove(groupId);
|
||||||
|
return groupIds.isEmpty() ? null : groupIds;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the group by topics mapping.
|
||||||
|
*
|
||||||
|
* @param groupId The group id.
|
||||||
|
* @param oldSubscribedTopics The old group subscriptions.
|
||||||
|
* @param newSubscribedTopics The new group subscriptions.
|
||||||
|
*/
|
||||||
|
private void updateGroupsByTopics(
|
||||||
|
String groupId,
|
||||||
|
Set<String> oldSubscribedTopics,
|
||||||
|
Set<String> newSubscribedTopics
|
||||||
|
) {
|
||||||
|
if (oldSubscribedTopics.isEmpty()) {
|
||||||
|
newSubscribedTopics.forEach(topicName ->
|
||||||
|
subscribeGroupToTopic(groupId, topicName)
|
||||||
|
);
|
||||||
|
} else if (newSubscribedTopics.isEmpty()) {
|
||||||
|
oldSubscribedTopics.forEach(topicName ->
|
||||||
|
unsubscribeGroupFromTopic(groupId, topicName)
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
oldSubscribedTopics.forEach(topicName -> {
|
||||||
|
if (!newSubscribedTopics.contains(topicName)) {
|
||||||
|
unsubscribeGroupFromTopic(groupId, topicName);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
newSubscribedTopics.forEach(topicName -> {
|
||||||
|
if (!oldSubscribedTopics.contains(topicName)) {
|
||||||
|
subscribeGroupToTopic(groupId, topicName);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -874,4 +1015,32 @@ public class GroupMetadataManager {
|
||||||
consumerGroup.updateMember(newMember);
|
consumerGroup.updateMember(newMember);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A new metadata image is available.
|
||||||
|
*
|
||||||
|
* @param newImage The new metadata image.
|
||||||
|
* @param delta The delta image.
|
||||||
|
*/
|
||||||
|
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
|
||||||
|
metadataImage = newImage;
|
||||||
|
|
||||||
|
// Notify all the groups subscribed to the created, updated or
|
||||||
|
// deleted topics.
|
||||||
|
Set<String> allGroupIds = new HashSet<>();
|
||||||
|
delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
|
||||||
|
String topicName = topicDelta.name();
|
||||||
|
allGroupIds.addAll(groupsSubscribedToTopic(topicName));
|
||||||
|
});
|
||||||
|
delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
|
||||||
|
TopicImage topicImage = delta.image().topics().getTopic(topicId);
|
||||||
|
allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
|
||||||
|
});
|
||||||
|
allGroupIds.forEach(groupId -> {
|
||||||
|
Group group = groups.get(groupId);
|
||||||
|
if (group != null && group.type() == Group.GroupType.CONSUMER) {
|
||||||
|
((ConsumerGroup) group).requestMetadataRefresh();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,8 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
|
||||||
import org.apache.kafka.coordinator.group.runtime.Coordinator;
|
import org.apache.kafka.coordinator.group.runtime.Coordinator;
|
||||||
import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder;
|
import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder;
|
||||||
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
|
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
|
||||||
|
import org.apache.kafka.image.MetadataDelta;
|
||||||
|
import org.apache.kafka.image.MetadataImage;
|
||||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||||
|
|
||||||
|
@ -130,6 +132,28 @@ public class ReplicatedGroupCoordinator implements Coordinator<Record> {
|
||||||
return groupMetadataManager.consumerGroupHeartbeat(context, request);
|
return groupMetadataManager.consumerGroupHeartbeat(context, request);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The coordinator has been loaded. This is used to apply any
|
||||||
|
* post loading operations (e.g. registering timers).
|
||||||
|
*
|
||||||
|
* @param newImage The metadata image.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void onLoaded(MetadataImage newImage) {
|
||||||
|
groupMetadataManager.onNewMetadataImage(newImage, new MetadataDelta(newImage));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A new metadata image is available.
|
||||||
|
*
|
||||||
|
* @param newImage The new metadata image.
|
||||||
|
* @param delta The delta image.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
|
||||||
|
groupMetadataManager.onNewMetadataImage(newImage, delta);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The ApiMessage or null.
|
* @return The ApiMessage or null.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -58,6 +58,18 @@ public class ConsumerGroup implements Group {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class DeadlineAndEpoch {
|
||||||
|
static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
|
||||||
|
|
||||||
|
public final long deadlineMs;
|
||||||
|
public final int epoch;
|
||||||
|
|
||||||
|
DeadlineAndEpoch(long deadlineMs, int epoch) {
|
||||||
|
this.deadlineMs = deadlineMs;
|
||||||
|
this.epoch = epoch;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The snapshot registry.
|
* The snapshot registry.
|
||||||
*/
|
*/
|
||||||
|
@ -119,6 +131,18 @@ public class ConsumerGroup implements Group {
|
||||||
*/
|
*/
|
||||||
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
|
private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The metadata refresh deadline. It consists of a timestamp in milliseconds together with
|
||||||
|
* the group epoch at the time of setting it. The metadata refresh time is considered as a
|
||||||
|
* soft state (read that it is not stored in a timeline data structure). It is like this
|
||||||
|
* because it is not persisted to the log. The group epoch is here to ensure that the
|
||||||
|
* metadata refresh deadline is invalidated if the group epoch does not correspond to
|
||||||
|
* the current group epoch. This can happen if the metadata refresh deadline is updated
|
||||||
|
* after having refreshed the metadata but the write operation failed. In this case, the
|
||||||
|
* time is not automatically rolled back.
|
||||||
|
*/
|
||||||
|
private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
|
||||||
|
|
||||||
public ConsumerGroup(
|
public ConsumerGroup(
|
||||||
SnapshotRegistry snapshotRegistry,
|
SnapshotRegistry snapshotRegistry,
|
||||||
String groupId
|
String groupId
|
||||||
|
@ -249,8 +273,10 @@ public class ConsumerGroup implements Group {
|
||||||
* @param memberId The member id to remove.
|
* @param memberId The member id to remove.
|
||||||
*/
|
*/
|
||||||
public void removeMember(String memberId) {
|
public void removeMember(String memberId) {
|
||||||
ConsumerGroupMember member = members.remove(memberId);
|
ConsumerGroupMember oldMember = members.remove(memberId);
|
||||||
maybeRemovePartitionEpoch(member);
|
maybeUpdateSubscribedTopicNames(oldMember, null);
|
||||||
|
maybeUpdateServerAssignors(oldMember, null);
|
||||||
|
maybeRemovePartitionEpoch(oldMember);
|
||||||
maybeUpdateGroupState();
|
maybeUpdateGroupState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -279,6 +305,13 @@ public class ConsumerGroup implements Group {
|
||||||
return Collections.unmodifiableMap(members);
|
return Collections.unmodifiableMap(members);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return An immutable Set containing all the subscribed topic names.
|
||||||
|
*/
|
||||||
|
public Set<String> subscribedTopicNames() {
|
||||||
|
return Collections.unmodifiableSet(subscribedTopicNames.keySet());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the target assignment of the member.
|
* Returns the target assignment of the member.
|
||||||
*
|
*
|
||||||
|
@ -423,6 +456,47 @@ public class ConsumerGroup implements Group {
|
||||||
return Collections.unmodifiableMap(newSubscriptionMetadata);
|
return Collections.unmodifiableMap(newSubscriptionMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates the metadata refresh deadline.
|
||||||
|
*
|
||||||
|
* @param deadlineMs The deadline in milliseconds.
|
||||||
|
* @param groupEpoch The associated group epoch.
|
||||||
|
*/
|
||||||
|
public void setMetadataRefreshDeadline(
|
||||||
|
long deadlineMs,
|
||||||
|
int groupEpoch
|
||||||
|
) {
|
||||||
|
this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, groupEpoch);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Requests a metadata refresh.
|
||||||
|
*/
|
||||||
|
public void requestMetadataRefresh() {
|
||||||
|
this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if a metadata refresh is required. A refresh is required in two cases:
|
||||||
|
* 1) The deadline is smaller or equal to the current time;
|
||||||
|
* 2) The group epoch associated with the deadline is larger than
|
||||||
|
* the current group epoch. This means that the operations which updated
|
||||||
|
* the deadline failed.
|
||||||
|
*
|
||||||
|
* @param currentTimeMs The current time in milliseconds.
|
||||||
|
* @return A boolean indicating whether a refresh is required or not.
|
||||||
|
*/
|
||||||
|
public boolean hasMetadataExpired(long currentTimeMs) {
|
||||||
|
return currentTimeMs >= metadataRefreshDeadline.deadlineMs || groupEpoch() < metadataRefreshDeadline.epoch;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The metadata refresh deadline.
|
||||||
|
*/
|
||||||
|
public DeadlineAndEpoch metadataRefreshDeadline() {
|
||||||
|
return metadataRefreshDeadline;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates the current state of the group.
|
* Updates the current state of the group.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.coordinator.group.runtime;
|
package org.apache.kafka.coordinator.group.runtime;
|
||||||
|
|
||||||
|
import org.apache.kafka.image.MetadataDelta;
|
||||||
|
import org.apache.kafka.image.MetadataImage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Coordinator is basically a replicated state machine managed by the
|
* Coordinator is basically a replicated state machine managed by the
|
||||||
* {@link CoordinatorRuntime}.
|
* {@link CoordinatorRuntime}.
|
||||||
|
@ -25,8 +28,19 @@ public interface Coordinator<U> extends CoordinatorPlayback<U> {
|
||||||
/**
|
/**
|
||||||
* The coordinator has been loaded. This is used to apply any
|
* The coordinator has been loaded. This is used to apply any
|
||||||
* post loading operations (e.g. registering timers).
|
* post loading operations (e.g. registering timers).
|
||||||
|
*
|
||||||
|
* @param newImage The metadata image.
|
||||||
*/
|
*/
|
||||||
default void onLoaded() {}
|
default void onLoaded(MetadataImage newImage) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A new metadata image is available. This is only called after {@link Coordinator#onLoaded(MetadataImage)}
|
||||||
|
* is called to signal that the coordinator has been fully loaded.
|
||||||
|
*
|
||||||
|
* @param newImage The new metadata image.
|
||||||
|
* @param delta The delta image.
|
||||||
|
*/
|
||||||
|
default void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The coordinator has been unloaded. This is used to apply
|
* The coordinator has been unloaded. This is used to apply
|
||||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.kafka.common.protocol.Errors;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
import org.apache.kafka.deferred.DeferredEvent;
|
import org.apache.kafka.deferred.DeferredEvent;
|
||||||
import org.apache.kafka.deferred.DeferredEventQueue;
|
import org.apache.kafka.deferred.DeferredEventQueue;
|
||||||
|
import org.apache.kafka.image.MetadataDelta;
|
||||||
|
import org.apache.kafka.image.MetadataImage;
|
||||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
@ -349,7 +351,7 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
|
||||||
state = CoordinatorState.ACTIVE;
|
state = CoordinatorState.ACTIVE;
|
||||||
snapshotRegistry.getOrCreateSnapshot(0);
|
snapshotRegistry.getOrCreateSnapshot(0);
|
||||||
partitionWriter.registerListener(tp, highWatermarklistener);
|
partitionWriter.registerListener(tp, highWatermarklistener);
|
||||||
coordinator.onLoaded();
|
coordinator.onLoaded(metadataImage);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case FAILED:
|
case FAILED:
|
||||||
|
@ -807,6 +809,11 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
|
||||||
*/
|
*/
|
||||||
private final AtomicBoolean isRunning = new AtomicBoolean(true);
|
private final AtomicBoolean isRunning = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The latest known metadata image.
|
||||||
|
*/
|
||||||
|
private volatile MetadataImage metadataImage = MetadataImage.EMPTY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
*
|
*
|
||||||
|
@ -1083,6 +1090,37 @@ public class CoordinatorRuntime<S extends Coordinator<U>, U> implements AutoClos
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A new metadata image is available.
|
||||||
|
*
|
||||||
|
* @param newImage The new metadata image.
|
||||||
|
* @param delta The metadata delta.
|
||||||
|
*/
|
||||||
|
public void onNewMetadataImage(
|
||||||
|
MetadataImage newImage,
|
||||||
|
MetadataDelta delta
|
||||||
|
) {
|
||||||
|
throwIfNotRunning();
|
||||||
|
log.debug("Scheduling applying of a new metadata image with offset {}.", newImage.offset());
|
||||||
|
|
||||||
|
// Update global image.
|
||||||
|
metadataImage = newImage;
|
||||||
|
|
||||||
|
// Push an event for each coordinator.
|
||||||
|
coordinators.keySet().forEach(tp -> {
|
||||||
|
scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" + newImage.offset() + ")", tp, () -> {
|
||||||
|
CoordinatorContext context = contextOrThrow(tp);
|
||||||
|
if (context.state == CoordinatorState.ACTIVE) {
|
||||||
|
log.debug("Applying new metadata image with offset {} to {}.", newImage.offset(), tp);
|
||||||
|
context.coordinator.onNewMetadataImage(newImage, delta);
|
||||||
|
} else {
|
||||||
|
log.debug("Ignoring new metadata image with offset {} for {} because the coordinator is not active.",
|
||||||
|
newImage.offset(), tp);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes the runtime. This closes all the coordinators currently registered
|
* Closes the runtime. This closes all the coordinators currently registered
|
||||||
* in the runtime.
|
* in the runtime.
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.UnsupportedAssignorException;
|
||||||
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
|
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
|
||||||
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
|
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
|
||||||
import org.apache.kafka.common.metadata.PartitionRecord;
|
import org.apache.kafka.common.metadata.PartitionRecord;
|
||||||
|
import org.apache.kafka.common.metadata.RemoveTopicRecord;
|
||||||
import org.apache.kafka.common.metadata.TopicRecord;
|
import org.apache.kafka.common.metadata.TopicRecord;
|
||||||
import org.apache.kafka.common.network.ClientInformation;
|
import org.apache.kafka.common.network.ClientInformation;
|
||||||
import org.apache.kafka.common.network.ListenerName;
|
import org.apache.kafka.common.network.ListenerName;
|
||||||
|
@ -37,6 +38,8 @@ import org.apache.kafka.common.requests.RequestHeader;
|
||||||
import org.apache.kafka.common.security.auth.KafkaPrincipal;
|
import org.apache.kafka.common.security.auth.KafkaPrincipal;
|
||||||
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
import org.apache.kafka.common.security.auth.SecurityProtocol;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
|
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
|
||||||
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
|
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
|
||||||
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
|
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
|
||||||
|
@ -59,8 +62,10 @@ import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmen
|
||||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
|
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
|
||||||
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
|
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
|
||||||
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
|
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
|
||||||
|
import org.apache.kafka.image.MetadataDelta;
|
||||||
|
import org.apache.kafka.image.MetadataImage;
|
||||||
|
import org.apache.kafka.image.MetadataProvenance;
|
||||||
import org.apache.kafka.image.TopicImage;
|
import org.apache.kafka.image.TopicImage;
|
||||||
import org.apache.kafka.image.TopicsDelta;
|
|
||||||
import org.apache.kafka.image.TopicsImage;
|
import org.apache.kafka.image.TopicsImage;
|
||||||
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
import org.apache.kafka.server.common.ApiMessageAndVersion;
|
||||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||||
|
@ -78,13 +83,16 @@ import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.apache.kafka.common.utils.Utils.mkSet;
|
||||||
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
|
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
|
||||||
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
|
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
|
||||||
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
|
import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -113,10 +121,10 @@ public class GroupMetadataManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TopicsImageBuilder {
|
public static class MetadataImageBuilder {
|
||||||
private TopicsDelta delta = new TopicsDelta(TopicsImage.EMPTY);
|
private MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
|
||||||
|
|
||||||
public TopicsImageBuilder addTopic(
|
public MetadataImageBuilder addTopic(
|
||||||
Uuid topicId,
|
Uuid topicId,
|
||||||
String topicName,
|
String topicName,
|
||||||
int numPartitions
|
int numPartitions
|
||||||
|
@ -130,8 +138,8 @@ public class GroupMetadataManagerTest {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TopicsImage build() {
|
public MetadataImage build() {
|
||||||
return delta.apply();
|
return delta.apply(MetadataProvenance.EMPTY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -141,6 +149,7 @@ public class GroupMetadataManagerTest {
|
||||||
private int assignmentEpoch;
|
private int assignmentEpoch;
|
||||||
private final Map<String, ConsumerGroupMember> members = new HashMap<>();
|
private final Map<String, ConsumerGroupMember> members = new HashMap<>();
|
||||||
private final Map<String, Assignment> assignments = new HashMap<>();
|
private final Map<String, Assignment> assignments = new HashMap<>();
|
||||||
|
private Map<String, TopicMetadata> subscriptionMetadata;
|
||||||
|
|
||||||
public ConsumerGroupBuilder(String groupId, int groupEpoch) {
|
public ConsumerGroupBuilder(String groupId, int groupEpoch) {
|
||||||
this.groupId = groupId;
|
this.groupId = groupId;
|
||||||
|
@ -153,6 +162,11 @@ public class GroupMetadataManagerTest {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ConsumerGroupBuilder withSubscriptionMetadata(Map<String, TopicMetadata> subscriptionMetadata) {
|
||||||
|
this.subscriptionMetadata = subscriptionMetadata;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public ConsumerGroupBuilder withAssignment(String memberId, Map<Uuid, Set<Integer>> assignment) {
|
public ConsumerGroupBuilder withAssignment(String memberId, Map<Uuid, Set<Integer>> assignment) {
|
||||||
this.assignments.put(memberId, new Assignment(assignment));
|
this.assignments.put(memberId, new Assignment(assignment));
|
||||||
return this;
|
return this;
|
||||||
|
@ -172,19 +186,21 @@ public class GroupMetadataManagerTest {
|
||||||
});
|
});
|
||||||
|
|
||||||
// Add subscription metadata.
|
// Add subscription metadata.
|
||||||
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
|
if (subscriptionMetadata == null) {
|
||||||
members.forEach((memberId, member) -> {
|
subscriptionMetadata = new HashMap<>();
|
||||||
member.subscribedTopicNames().forEach(topicName -> {
|
members.forEach((memberId, member) -> {
|
||||||
TopicImage topicImage = topicsImage.getTopic(topicName);
|
member.subscribedTopicNames().forEach(topicName -> {
|
||||||
if (topicImage != null) {
|
TopicImage topicImage = topicsImage.getTopic(topicName);
|
||||||
subscriptionMetadata.put(topicName, new TopicMetadata(
|
if (topicImage != null) {
|
||||||
topicImage.id(),
|
subscriptionMetadata.put(topicName, new TopicMetadata(
|
||||||
topicImage.name(),
|
topicImage.id(),
|
||||||
topicImage.partitions().size()
|
topicImage.name(),
|
||||||
));
|
topicImage.partitions().size()
|
||||||
}
|
));
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
}
|
||||||
|
|
||||||
if (!subscriptionMetadata.isEmpty()) {
|
if (!subscriptionMetadata.isEmpty()) {
|
||||||
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
|
records.add(RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
|
||||||
|
@ -212,15 +228,17 @@ public class GroupMetadataManagerTest {
|
||||||
|
|
||||||
static class GroupMetadataManagerTestContext {
|
static class GroupMetadataManagerTestContext {
|
||||||
static class Builder {
|
static class Builder {
|
||||||
|
final private Time time = new MockTime();
|
||||||
final private LogContext logContext = new LogContext();
|
final private LogContext logContext = new LogContext();
|
||||||
final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
|
final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
|
||||||
private TopicsImage topicsImage;
|
private MetadataImage metadataImage;
|
||||||
private List<PartitionAssignor> assignors;
|
private List<PartitionAssignor> assignors;
|
||||||
private List<ConsumerGroupBuilder> consumerGroupBuilders = new ArrayList<>();
|
private List<ConsumerGroupBuilder> consumerGroupBuilders = new ArrayList<>();
|
||||||
private int consumerGroupMaxSize = Integer.MAX_VALUE;
|
private int consumerGroupMaxSize = Integer.MAX_VALUE;
|
||||||
|
private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
|
||||||
|
|
||||||
public Builder withTopicsImage(TopicsImage topicsImage) {
|
public Builder withMetadataImage(MetadataImage metadataImage) {
|
||||||
this.topicsImage = topicsImage;
|
this.metadataImage = metadataImage;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,24 +257,32 @@ public class GroupMetadataManagerTest {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withConsumerGroupMetadataRefreshIntervalMs(int consumerGroupMetadataRefreshIntervalMs) {
|
||||||
|
this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public GroupMetadataManagerTestContext build() {
|
public GroupMetadataManagerTestContext build() {
|
||||||
if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
|
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
|
||||||
if (assignors == null) assignors = Collections.emptyList();
|
if (assignors == null) assignors = Collections.emptyList();
|
||||||
|
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext(
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext(
|
||||||
|
time,
|
||||||
snapshotRegistry,
|
snapshotRegistry,
|
||||||
new GroupMetadataManager.Builder()
|
new GroupMetadataManager.Builder()
|
||||||
.withSnapshotRegistry(snapshotRegistry)
|
.withSnapshotRegistry(snapshotRegistry)
|
||||||
.withLogContext(logContext)
|
.withLogContext(logContext)
|
||||||
.withTopicsImage(topicsImage)
|
.withTime(time)
|
||||||
|
.withMetadataImage(metadataImage)
|
||||||
.withConsumerGroupHeartbeatInterval(5000)
|
.withConsumerGroupHeartbeatInterval(5000)
|
||||||
.withConsumerGroupMaxSize(consumerGroupMaxSize)
|
.withConsumerGroupMaxSize(consumerGroupMaxSize)
|
||||||
.withAssignors(assignors)
|
.withAssignors(assignors)
|
||||||
|
.withConsumerGroupMetadataRefreshIntervalMs(consumerGroupMetadataRefreshIntervalMs)
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
|
|
||||||
consumerGroupBuilders.forEach(builder -> {
|
consumerGroupBuilders.forEach(builder -> {
|
||||||
builder.build(topicsImage).forEach(context::replay);
|
builder.build(metadataImage.topics()).forEach(context::replay);
|
||||||
});
|
});
|
||||||
|
|
||||||
context.commit();
|
context.commit();
|
||||||
|
@ -265,6 +291,7 @@ public class GroupMetadataManagerTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Time time;
|
||||||
final SnapshotRegistry snapshotRegistry;
|
final SnapshotRegistry snapshotRegistry;
|
||||||
final GroupMetadataManager groupMetadataManager;
|
final GroupMetadataManager groupMetadataManager;
|
||||||
|
|
||||||
|
@ -272,9 +299,11 @@ public class GroupMetadataManagerTest {
|
||||||
long lastWrittenOffset = 0L;
|
long lastWrittenOffset = 0L;
|
||||||
|
|
||||||
public GroupMetadataManagerTestContext(
|
public GroupMetadataManagerTestContext(
|
||||||
|
Time time,
|
||||||
SnapshotRegistry snapshotRegistry,
|
SnapshotRegistry snapshotRegistry,
|
||||||
GroupMetadataManager groupMetadataManager
|
GroupMetadataManager groupMetadataManager
|
||||||
) {
|
) {
|
||||||
|
this.time = time;
|
||||||
this.snapshotRegistry = snapshotRegistry;
|
this.snapshotRegistry = snapshotRegistry;
|
||||||
this.groupMetadataManager = groupMetadataManager;
|
this.groupMetadataManager = groupMetadataManager;
|
||||||
}
|
}
|
||||||
|
@ -486,7 +515,7 @@ public class GroupMetadataManagerTest {
|
||||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
.withAssignors(Collections.singletonList(assignor))
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
.withTopicsImage(TopicsImage.EMPTY)
|
.withMetadataImage(MetadataImage.EMPTY)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
assignor.prepareGroupAssignment(new GroupAssignment(
|
assignor.prepareGroupAssignment(new GroupAssignment(
|
||||||
|
@ -675,7 +704,7 @@ public class GroupMetadataManagerTest {
|
||||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
.withAssignors(Collections.singletonList(assignor))
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
.withTopicsImage(new TopicsImageBuilder()
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
.addTopic(fooTopicId, fooTopicName, 6)
|
.addTopic(fooTopicId, fooTopicName, 6)
|
||||||
.addTopic(barTopicId, barTopicName, 3)
|
.addTopic(barTopicId, barTopicName, 3)
|
||||||
.build())
|
.build())
|
||||||
|
@ -764,7 +793,7 @@ public class GroupMetadataManagerTest {
|
||||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
.withAssignors(Collections.singletonList(assignor))
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
.withTopicsImage(new TopicsImageBuilder()
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
.addTopic(fooTopicId, fooTopicName, 6)
|
.addTopic(fooTopicId, fooTopicName, 6)
|
||||||
.addTopic(barTopicId, barTopicName, 3)
|
.addTopic(barTopicId, barTopicName, 3)
|
||||||
.build())
|
.build())
|
||||||
|
@ -865,7 +894,7 @@ public class GroupMetadataManagerTest {
|
||||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
.withAssignors(Collections.singletonList(assignor))
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
.withTopicsImage(new TopicsImageBuilder()
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
.addTopic(fooTopicId, fooTopicName, 6)
|
.addTopic(fooTopicId, fooTopicName, 6)
|
||||||
.addTopic(barTopicId, barTopicName, 3)
|
.addTopic(barTopicId, barTopicName, 3)
|
||||||
.build())
|
.build())
|
||||||
|
@ -1009,7 +1038,7 @@ public class GroupMetadataManagerTest {
|
||||||
// Consumer group with two members.
|
// Consumer group with two members.
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
.withAssignors(Collections.singletonList(assignor))
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
.withTopicsImage(new TopicsImageBuilder()
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
.addTopic(fooTopicId, fooTopicName, 6)
|
.addTopic(fooTopicId, fooTopicName, 6)
|
||||||
.addTopic(barTopicId, barTopicName, 3)
|
.addTopic(barTopicId, barTopicName, 3)
|
||||||
.addTopic(zarTopicId, zarTopicName, 1)
|
.addTopic(zarTopicId, zarTopicName, 1)
|
||||||
|
@ -1100,7 +1129,7 @@ public class GroupMetadataManagerTest {
|
||||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
.withAssignors(Collections.singletonList(assignor))
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
.withTopicsImage(new TopicsImageBuilder()
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
.addTopic(fooTopicId, fooTopicName, 6)
|
.addTopic(fooTopicId, fooTopicName, 6)
|
||||||
.addTopic(barTopicId, barTopicName, 3)
|
.addTopic(barTopicId, barTopicName, 3)
|
||||||
.build())
|
.build())
|
||||||
|
@ -1546,7 +1575,7 @@ public class GroupMetadataManagerTest {
|
||||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
.withAssignors(Collections.singletonList(assignor))
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
.withTopicsImage(new TopicsImageBuilder()
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
.addTopic(fooTopicId, fooTopicName, 6)
|
.addTopic(fooTopicId, fooTopicName, 6)
|
||||||
.build())
|
.build())
|
||||||
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
|
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
|
||||||
|
@ -1794,7 +1823,7 @@ public class GroupMetadataManagerTest {
|
||||||
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
.withAssignors(Collections.singletonList(assignor))
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
.withTopicsImage(new TopicsImageBuilder()
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
.addTopic(fooTopicId, fooTopicName, 6)
|
.addTopic(fooTopicId, fooTopicName, 6)
|
||||||
.addTopic(barTopicId, barTopicName, 3)
|
.addTopic(barTopicId, barTopicName, 3)
|
||||||
.build())
|
.build())
|
||||||
|
@ -1912,7 +1941,7 @@ public class GroupMetadataManagerTest {
|
||||||
|
|
||||||
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
.withAssignors(Collections.singletonList(assignor))
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
.withTopicsImage(new TopicsImageBuilder()
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
.addTopic(fooTopicId, fooTopicName, 6)
|
.addTopic(fooTopicId, fooTopicName, 6)
|
||||||
.addTopic(barTopicId, barTopicName, 3)
|
.addTopic(barTopicId, barTopicName, 3)
|
||||||
.build())
|
.build())
|
||||||
|
@ -1932,6 +1961,435 @@ public class GroupMetadataManagerTest {
|
||||||
.setTopicPartitions(Collections.emptyList())));
|
.setTopicPartitions(Collections.emptyList())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
|
||||||
|
String groupId = "fooup";
|
||||||
|
// Use a static member id as it makes the test easier.
|
||||||
|
String memberId = Uuid.randomUuid().toString();
|
||||||
|
|
||||||
|
Uuid fooTopicId = Uuid.randomUuid();
|
||||||
|
String fooTopicName = "foo";
|
||||||
|
|
||||||
|
// Create a context with one consumer group containing one member.
|
||||||
|
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||||
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
|
.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
|
||||||
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
|
.addTopic(fooTopicId, fooTopicName, 6)
|
||||||
|
.build())
|
||||||
|
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
|
||||||
|
.withMember(new ConsumerGroupMember.Builder(memberId)
|
||||||
|
.setMemberEpoch(10)
|
||||||
|
.setPreviousMemberEpoch(10)
|
||||||
|
.setTargetMemberEpoch(10)
|
||||||
|
.setClientId("client")
|
||||||
|
.setClientHost("localhost/127.0.0.1")
|
||||||
|
.setRebalanceTimeoutMs(5000)
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
|
||||||
|
.setServerAssignorName("range")
|
||||||
|
.setAssignedPartitions(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 0, 1, 2)))
|
||||||
|
.build())
|
||||||
|
.withAssignment(memberId, mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 0, 1, 2)))
|
||||||
|
.withAssignmentEpoch(10)
|
||||||
|
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
|
||||||
|
{
|
||||||
|
// foo only has 3 partitions stored in the metadata but foo has
|
||||||
|
// 6 partitions the metadata image.
|
||||||
|
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// The metadata refresh flag should be true.
|
||||||
|
ConsumerGroup consumerGroup = context.groupMetadataManager
|
||||||
|
.getOrMaybeCreateConsumerGroup(groupId, false);
|
||||||
|
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
|
||||||
|
|
||||||
|
// Prepare the assignment result.
|
||||||
|
assignor.prepareGroupAssignment(new GroupAssignment(
|
||||||
|
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
|
||||||
|
)))
|
||||||
|
));
|
||||||
|
|
||||||
|
// Heartbeat.
|
||||||
|
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
|
||||||
|
new ConsumerGroupHeartbeatRequestData()
|
||||||
|
.setGroupId(groupId)
|
||||||
|
.setMemberId(memberId)
|
||||||
|
.setMemberEpoch(10));
|
||||||
|
|
||||||
|
// The member gets partitions 3, 4 and 5 assigned.
|
||||||
|
assertResponseEquals(
|
||||||
|
new ConsumerGroupHeartbeatResponseData()
|
||||||
|
.setMemberId(memberId)
|
||||||
|
.setMemberEpoch(11)
|
||||||
|
.setHeartbeatIntervalMs(5000)
|
||||||
|
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
|
||||||
|
.setAssignedTopicPartitions(Arrays.asList(
|
||||||
|
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
|
||||||
|
.setTopicId(fooTopicId)
|
||||||
|
.setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
|
||||||
|
))),
|
||||||
|
result.response()
|
||||||
|
);
|
||||||
|
|
||||||
|
ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId)
|
||||||
|
.setMemberEpoch(11)
|
||||||
|
.setPreviousMemberEpoch(10)
|
||||||
|
.setTargetMemberEpoch(11)
|
||||||
|
.setClientId("client")
|
||||||
|
.setClientHost("localhost/127.0.0.1")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
|
||||||
|
.setServerAssignorName("range")
|
||||||
|
.setAssignedPartitions(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Record> expectedRecords = Arrays.asList(
|
||||||
|
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
|
||||||
|
{
|
||||||
|
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
RecordHelpers.newGroupEpochRecord(groupId, 11),
|
||||||
|
RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
|
||||||
|
)),
|
||||||
|
RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
|
||||||
|
RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
|
||||||
|
);
|
||||||
|
|
||||||
|
assertRecordsEquals(expectedRecords, result.records());
|
||||||
|
|
||||||
|
// Check next refresh time.
|
||||||
|
assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
|
||||||
|
assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
|
||||||
|
assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSubscriptionMetadataRefreshedAgainAfterWriteFailure() {
|
||||||
|
String groupId = "fooup";
|
||||||
|
// Use a static member id as it makes the test easier.
|
||||||
|
String memberId = Uuid.randomUuid().toString();
|
||||||
|
|
||||||
|
Uuid fooTopicId = Uuid.randomUuid();
|
||||||
|
String fooTopicName = "foo";
|
||||||
|
|
||||||
|
// Create a context with one consumer group containing one member.
|
||||||
|
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||||
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
|
.withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
|
||||||
|
.withMetadataImage(new MetadataImageBuilder()
|
||||||
|
.addTopic(fooTopicId, fooTopicName, 6)
|
||||||
|
.build())
|
||||||
|
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
|
||||||
|
.withMember(new ConsumerGroupMember.Builder(memberId)
|
||||||
|
.setMemberEpoch(10)
|
||||||
|
.setPreviousMemberEpoch(10)
|
||||||
|
.setTargetMemberEpoch(10)
|
||||||
|
.setClientId("client")
|
||||||
|
.setClientHost("localhost/127.0.0.1")
|
||||||
|
.setRebalanceTimeoutMs(5000)
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
|
||||||
|
.setServerAssignorName("range")
|
||||||
|
.setAssignedPartitions(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 0, 1, 2)))
|
||||||
|
.build())
|
||||||
|
.withAssignment(memberId, mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 0, 1, 2)))
|
||||||
|
.withAssignmentEpoch(10)
|
||||||
|
.withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
|
||||||
|
{
|
||||||
|
// foo only has 3 partitions stored in the metadata but foo has
|
||||||
|
// 6 partitions the metadata image.
|
||||||
|
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// The metadata refresh flag should be true.
|
||||||
|
ConsumerGroup consumerGroup = context.groupMetadataManager
|
||||||
|
.getOrMaybeCreateConsumerGroup(groupId, false);
|
||||||
|
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
|
||||||
|
|
||||||
|
// Prepare the assignment result.
|
||||||
|
assignor.prepareGroupAssignment(new GroupAssignment(
|
||||||
|
Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
|
||||||
|
)))
|
||||||
|
));
|
||||||
|
|
||||||
|
// Heartbeat.
|
||||||
|
context.consumerGroupHeartbeat(
|
||||||
|
new ConsumerGroupHeartbeatRequestData()
|
||||||
|
.setGroupId(groupId)
|
||||||
|
.setMemberId(memberId)
|
||||||
|
.setMemberEpoch(10));
|
||||||
|
|
||||||
|
// The metadata refresh flag is set to a future time.
|
||||||
|
assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
|
||||||
|
assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
|
||||||
|
assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
|
||||||
|
|
||||||
|
// Rollback the uncommitted changes. This does not rollback the metadata flag
|
||||||
|
// because it is not using a timeline data structure.
|
||||||
|
context.rollback();
|
||||||
|
|
||||||
|
// However, the next heartbeat should detect the divergence based on the epoch and trigger
|
||||||
|
// a metadata refresh.
|
||||||
|
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
|
||||||
|
new ConsumerGroupHeartbeatRequestData()
|
||||||
|
.setGroupId(groupId)
|
||||||
|
.setMemberId(memberId)
|
||||||
|
.setMemberEpoch(10));
|
||||||
|
|
||||||
|
|
||||||
|
// The member gets partitions 3, 4 and 5 assigned.
|
||||||
|
assertResponseEquals(
|
||||||
|
new ConsumerGroupHeartbeatResponseData()
|
||||||
|
.setMemberId(memberId)
|
||||||
|
.setMemberEpoch(11)
|
||||||
|
.setHeartbeatIntervalMs(5000)
|
||||||
|
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
|
||||||
|
.setAssignedTopicPartitions(Arrays.asList(
|
||||||
|
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
|
||||||
|
.setTopicId(fooTopicId)
|
||||||
|
.setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
|
||||||
|
))),
|
||||||
|
result.response()
|
||||||
|
);
|
||||||
|
|
||||||
|
ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId)
|
||||||
|
.setMemberEpoch(11)
|
||||||
|
.setPreviousMemberEpoch(10)
|
||||||
|
.setTargetMemberEpoch(11)
|
||||||
|
.setClientId("client")
|
||||||
|
.setClientHost("localhost/127.0.0.1")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
|
||||||
|
.setServerAssignorName("range")
|
||||||
|
.setAssignedPartitions(mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Record> expectedRecords = Arrays.asList(
|
||||||
|
RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
|
||||||
|
{
|
||||||
|
put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
RecordHelpers.newGroupEpochRecord(groupId, 11),
|
||||||
|
RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
|
||||||
|
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
|
||||||
|
)),
|
||||||
|
RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
|
||||||
|
RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
|
||||||
|
);
|
||||||
|
|
||||||
|
assertRecordsEquals(expectedRecords, result.records());
|
||||||
|
|
||||||
|
// Check next refresh time.
|
||||||
|
assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
|
||||||
|
assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
|
||||||
|
assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGroupIdsByTopics() {
|
||||||
|
String groupId1 = "group1";
|
||||||
|
String groupId2 = "group2";
|
||||||
|
|
||||||
|
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
|
||||||
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
|
.withAssignors(Collections.singletonList(assignor))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
|
||||||
|
assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
|
||||||
|
assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
|
||||||
|
|
||||||
|
// M1 in group 1 subscribes to foo and bar.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
|
||||||
|
new ConsumerGroupMember.Builder("group1-m1")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
|
||||||
|
assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
|
||||||
|
assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
|
||||||
|
|
||||||
|
// M1 in group 2 subscribes to foo, bar and zar.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
|
||||||
|
new ConsumerGroupMember.Builder("group2-m1")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
|
||||||
|
assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
|
||||||
|
|
||||||
|
// M2 in group 1 subscribes to bar and zar.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
|
||||||
|
new ConsumerGroupMember.Builder("group1-m2")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("bar", "zar"))
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
|
||||||
|
|
||||||
|
// M2 in group 2 subscribes to foo and bar.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
|
||||||
|
new ConsumerGroupMember.Builder("group2-m2")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
|
||||||
|
|
||||||
|
// M1 in group 1 is removed.
|
||||||
|
context.replay(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId1, "group1-m1"));
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId1, "group1-m1"));
|
||||||
|
|
||||||
|
assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
|
||||||
|
|
||||||
|
// M1 in group 2 subscribes to nothing.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
|
||||||
|
new ConsumerGroupMember.Builder("group2-m1")
|
||||||
|
.setSubscribedTopicNames(Collections.emptyList())
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
|
||||||
|
assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
|
||||||
|
assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
|
||||||
|
|
||||||
|
// M2 in group 2 subscribes to foo.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
|
||||||
|
new ConsumerGroupMember.Builder("group2-m2")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("foo"))
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
|
||||||
|
assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
|
||||||
|
assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
|
||||||
|
|
||||||
|
// M2 in group 2 subscribes to nothing.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
|
||||||
|
new ConsumerGroupMember.Builder("group2-m2")
|
||||||
|
.setSubscribedTopicNames(Collections.emptyList())
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
|
||||||
|
assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
|
||||||
|
assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
|
||||||
|
|
||||||
|
// M2 in group 1 subscribes to nothing.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
|
||||||
|
new ConsumerGroupMember.Builder("group1-m2")
|
||||||
|
.setSubscribedTopicNames(Collections.emptyList())
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
|
||||||
|
assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
|
||||||
|
assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOnNewMetadataImage() {
|
||||||
|
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
|
||||||
|
.withAssignors(Collections.singletonList(new MockPartitionAssignor("range")))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// M1 in group 1 subscribes to a and b.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord("group1",
|
||||||
|
new ConsumerGroupMember.Builder("group1-m1")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("a", "b"))
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
// M1 in group 2 subscribes to b and c.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord("group2",
|
||||||
|
new ConsumerGroupMember.Builder("group2-m1")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("b", "c"))
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
// M1 in group 3 subscribes to d.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord("group3",
|
||||||
|
new ConsumerGroupMember.Builder("group3-m1")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("d"))
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
// M1 in group 4 subscribes to e.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord("group4",
|
||||||
|
new ConsumerGroupMember.Builder("group4-m1")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("e"))
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
// M1 in group 5 subscribes to f.
|
||||||
|
context.replay(RecordHelpers.newMemberSubscriptionRecord("group5",
|
||||||
|
new ConsumerGroupMember.Builder("group5-m1")
|
||||||
|
.setSubscribedTopicNames(Arrays.asList("f"))
|
||||||
|
.build()));
|
||||||
|
|
||||||
|
// Ensures that all refresh flags are set to the future.
|
||||||
|
Arrays.asList("group1", "group2", "group3", "group4", "group5").forEach(groupId -> {
|
||||||
|
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
|
||||||
|
group.setMetadataRefreshDeadline(context.time.milliseconds() + 5000L, 0);
|
||||||
|
assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
|
||||||
|
});
|
||||||
|
|
||||||
|
// Update the metadata image.
|
||||||
|
Uuid topicA = Uuid.randomUuid();
|
||||||
|
Uuid topicB = Uuid.randomUuid();
|
||||||
|
Uuid topicC = Uuid.randomUuid();
|
||||||
|
Uuid topicD = Uuid.randomUuid();
|
||||||
|
Uuid topicE = Uuid.randomUuid();
|
||||||
|
|
||||||
|
// Create a first base image with topic a, b, c and d.
|
||||||
|
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
|
||||||
|
delta.replay(new TopicRecord().setTopicId(topicA).setName("a"));
|
||||||
|
delta.replay(new PartitionRecord().setTopicId(topicA).setPartitionId(0));
|
||||||
|
delta.replay(new TopicRecord().setTopicId(topicB).setName("b"));
|
||||||
|
delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(0));
|
||||||
|
delta.replay(new TopicRecord().setTopicId(topicC).setName("c"));
|
||||||
|
delta.replay(new PartitionRecord().setTopicId(topicC).setPartitionId(0));
|
||||||
|
delta.replay(new TopicRecord().setTopicId(topicD).setName("d"));
|
||||||
|
delta.replay(new PartitionRecord().setTopicId(topicD).setPartitionId(0));
|
||||||
|
MetadataImage image = delta.apply(MetadataProvenance.EMPTY);
|
||||||
|
|
||||||
|
// Create a delta which updates topic B, deletes topic D and creates topic E.
|
||||||
|
delta = new MetadataDelta(image);
|
||||||
|
delta.replay(new PartitionRecord().setTopicId(topicB).setPartitionId(2));
|
||||||
|
delta.replay(new RemoveTopicRecord().setTopicId(topicD));
|
||||||
|
delta.replay(new TopicRecord().setTopicId(topicE).setName("e"));
|
||||||
|
delta.replay(new PartitionRecord().setTopicId(topicE).setPartitionId(1));
|
||||||
|
image = delta.apply(MetadataProvenance.EMPTY);
|
||||||
|
|
||||||
|
// Update metadata image with the delta.
|
||||||
|
context.groupMetadataManager.onNewMetadataImage(image, delta);
|
||||||
|
|
||||||
|
// Verify the groups.
|
||||||
|
Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId -> {
|
||||||
|
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
|
||||||
|
assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
|
||||||
|
});
|
||||||
|
|
||||||
|
Arrays.asList("group5").forEach(groupId -> {
|
||||||
|
ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
|
||||||
|
assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
|
||||||
|
});
|
||||||
|
|
||||||
|
// Verify image.
|
||||||
|
assertEquals(image, context.groupMetadataManager.image());
|
||||||
|
}
|
||||||
|
|
||||||
private <T> void assertUnorderedListEquals(
|
private <T> void assertUnorderedListEquals(
|
||||||
List<T> expected,
|
List<T> expected,
|
||||||
List<T> actual
|
List<T> actual
|
||||||
|
|
|
@ -19,8 +19,9 @@ package org.apache.kafka.coordinator.group.consumer;
|
||||||
import org.apache.kafka.common.Uuid;
|
import org.apache.kafka.common.Uuid;
|
||||||
import org.apache.kafka.common.errors.UnknownMemberIdException;
|
import org.apache.kafka.common.errors.UnknownMemberIdException;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
|
import org.apache.kafka.common.utils.MockTime;
|
||||||
import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
|
import org.apache.kafka.coordinator.group.GroupMetadataManagerTest;
|
||||||
import org.apache.kafka.image.TopicsImage;
|
import org.apache.kafka.image.MetadataImage;
|
||||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
@ -392,7 +393,7 @@ public class ConsumerGroupTest {
|
||||||
Uuid barTopicId = Uuid.randomUuid();
|
Uuid barTopicId = Uuid.randomUuid();
|
||||||
Uuid zarTopicId = Uuid.randomUuid();
|
Uuid zarTopicId = Uuid.randomUuid();
|
||||||
|
|
||||||
TopicsImage image = new GroupMetadataManagerTest.TopicsImageBuilder()
|
MetadataImage image = new GroupMetadataManagerTest.MetadataImageBuilder()
|
||||||
.addTopic(fooTopicId, "foo", 1)
|
.addTopic(fooTopicId, "foo", 1)
|
||||||
.addTopic(barTopicId, "bar", 2)
|
.addTopic(barTopicId, "bar", 2)
|
||||||
.addTopic(zarTopicId, "zar", 3)
|
.addTopic(zarTopicId, "zar", 3)
|
||||||
|
@ -416,7 +417,7 @@ public class ConsumerGroupTest {
|
||||||
consumerGroup.computeSubscriptionMetadata(
|
consumerGroup.computeSubscriptionMetadata(
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
image
|
image.topics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -428,7 +429,7 @@ public class ConsumerGroupTest {
|
||||||
consumerGroup.computeSubscriptionMetadata(
|
consumerGroup.computeSubscriptionMetadata(
|
||||||
null,
|
null,
|
||||||
member1,
|
member1,
|
||||||
image
|
image.topics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -443,7 +444,7 @@ public class ConsumerGroupTest {
|
||||||
consumerGroup.computeSubscriptionMetadata(
|
consumerGroup.computeSubscriptionMetadata(
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
image
|
image.topics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -453,7 +454,7 @@ public class ConsumerGroupTest {
|
||||||
consumerGroup.computeSubscriptionMetadata(
|
consumerGroup.computeSubscriptionMetadata(
|
||||||
member1,
|
member1,
|
||||||
null,
|
null,
|
||||||
image
|
image.topics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -466,7 +467,7 @@ public class ConsumerGroupTest {
|
||||||
consumerGroup.computeSubscriptionMetadata(
|
consumerGroup.computeSubscriptionMetadata(
|
||||||
null,
|
null,
|
||||||
member2,
|
member2,
|
||||||
image
|
image.topics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -482,7 +483,7 @@ public class ConsumerGroupTest {
|
||||||
consumerGroup.computeSubscriptionMetadata(
|
consumerGroup.computeSubscriptionMetadata(
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
image
|
image.topics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -494,7 +495,7 @@ public class ConsumerGroupTest {
|
||||||
consumerGroup.computeSubscriptionMetadata(
|
consumerGroup.computeSubscriptionMetadata(
|
||||||
member2,
|
member2,
|
||||||
null,
|
null,
|
||||||
image
|
image.topics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -506,7 +507,7 @@ public class ConsumerGroupTest {
|
||||||
consumerGroup.computeSubscriptionMetadata(
|
consumerGroup.computeSubscriptionMetadata(
|
||||||
member1,
|
member1,
|
||||||
null,
|
null,
|
||||||
image
|
image.topics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -520,7 +521,7 @@ public class ConsumerGroupTest {
|
||||||
consumerGroup.computeSubscriptionMetadata(
|
consumerGroup.computeSubscriptionMetadata(
|
||||||
null,
|
null,
|
||||||
member3,
|
member3,
|
||||||
image
|
image.topics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -537,8 +538,57 @@ public class ConsumerGroupTest {
|
||||||
consumerGroup.computeSubscriptionMetadata(
|
consumerGroup.computeSubscriptionMetadata(
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
image
|
image.topics()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetadataRefreshDeadline() {
|
||||||
|
MockTime time = new MockTime();
|
||||||
|
ConsumerGroup group = createConsumerGroup("group-foo");
|
||||||
|
|
||||||
|
// Group epoch starts at 0.
|
||||||
|
assertEquals(0, group.groupEpoch());
|
||||||
|
|
||||||
|
// The refresh time deadline should be empty when the group is created or loaded.
|
||||||
|
assertTrue(group.hasMetadataExpired(time.milliseconds()));
|
||||||
|
assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
|
||||||
|
assertEquals(0, group.metadataRefreshDeadline().epoch);
|
||||||
|
|
||||||
|
// Set the refresh deadline. The metadata remains valid because the deadline
|
||||||
|
// has not past and the group epoch is correct.
|
||||||
|
group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());
|
||||||
|
assertFalse(group.hasMetadataExpired(time.milliseconds()));
|
||||||
|
assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
|
||||||
|
assertEquals(group.groupEpoch(), group.metadataRefreshDeadline().epoch);
|
||||||
|
|
||||||
|
// Advance past the deadline. The metadata should have expired.
|
||||||
|
time.sleep(1001L);
|
||||||
|
assertTrue(group.hasMetadataExpired(time.milliseconds()));
|
||||||
|
|
||||||
|
// Set the refresh time deadline with a higher group epoch. The metadata is considered
|
||||||
|
// as expired because the group epoch attached to the deadline is higher than the
|
||||||
|
// current group epoch.
|
||||||
|
group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1);
|
||||||
|
assertTrue(group.hasMetadataExpired(time.milliseconds()));
|
||||||
|
assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
|
||||||
|
assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
|
||||||
|
|
||||||
|
// Advance the group epoch.
|
||||||
|
group.setGroupEpoch(group.groupEpoch() + 1);
|
||||||
|
|
||||||
|
// Set the refresh deadline. The metadata remains valid because the deadline
|
||||||
|
// has not past and the group epoch is correct.
|
||||||
|
group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());
|
||||||
|
assertFalse(group.hasMetadataExpired(time.milliseconds()));
|
||||||
|
assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
|
||||||
|
assertEquals(group.groupEpoch(), group.metadataRefreshDeadline().epoch);
|
||||||
|
|
||||||
|
// Request metadata refresh. The metadata expires immediately.
|
||||||
|
group.requestMetadataRefresh();
|
||||||
|
assertTrue(group.hasMetadataExpired(time.milliseconds()));
|
||||||
|
assertEquals(0L, group.metadataRefreshDeadline().deadlineMs);
|
||||||
|
assertEquals(0, group.metadataRefreshDeadline().epoch);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,9 @@ import org.apache.kafka.common.KafkaException;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
import org.apache.kafka.common.errors.NotCoordinatorException;
|
import org.apache.kafka.common.errors.NotCoordinatorException;
|
||||||
import org.apache.kafka.common.utils.LogContext;
|
import org.apache.kafka.common.utils.LogContext;
|
||||||
|
import org.apache.kafka.image.MetadataDelta;
|
||||||
|
import org.apache.kafka.image.MetadataImage;
|
||||||
|
import org.apache.kafka.image.MetadataProvenance;
|
||||||
import org.apache.kafka.timeline.SnapshotRegistry;
|
import org.apache.kafka.timeline.SnapshotRegistry;
|
||||||
import org.apache.kafka.timeline.TimelineHashSet;
|
import org.apache.kafka.timeline.TimelineHashSet;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
@ -222,7 +225,7 @@ public class CoordinatorRuntimeTest {
|
||||||
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
|
assertEquals(CoordinatorRuntime.CoordinatorState.ACTIVE, ctx.state);
|
||||||
|
|
||||||
// Verify that onLoaded is called.
|
// Verify that onLoaded is called.
|
||||||
verify(coordinator, times(1)).onLoaded();
|
verify(coordinator, times(1)).onLoaded(MetadataImage.EMPTY);
|
||||||
|
|
||||||
// Verify that the listener is registered.
|
// Verify that the listener is registered.
|
||||||
verify(writer, times(1)).registerListener(
|
verify(writer, times(1)).registerListener(
|
||||||
|
@ -834,4 +837,60 @@ public class CoordinatorRuntimeTest {
|
||||||
// Verify that the loader was closed.
|
// Verify that the loader was closed.
|
||||||
verify(loader).close();
|
verify(loader).close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOnNewMetadataImage() {
|
||||||
|
TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
|
||||||
|
TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
|
||||||
|
|
||||||
|
MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
|
||||||
|
MockPartitionWriter writer = mock(MockPartitionWriter.class);
|
||||||
|
MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
|
||||||
|
MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
|
||||||
|
|
||||||
|
CoordinatorRuntime<MockCoordinator, String> runtime =
|
||||||
|
new CoordinatorRuntime.Builder<MockCoordinator, String>()
|
||||||
|
.withLoader(loader)
|
||||||
|
.withEventProcessor(new MockEventProcessor())
|
||||||
|
.withPartitionWriter(writer)
|
||||||
|
.withCoordinatorBuilderSupplier(supplier)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
MockCoordinator coordinator0 = mock(MockCoordinator.class);
|
||||||
|
MockCoordinator coordinator1 = mock(MockCoordinator.class);
|
||||||
|
|
||||||
|
when(supplier.get()).thenReturn(builder);
|
||||||
|
when(builder.withSnapshotRegistry(any())).thenReturn(builder);
|
||||||
|
when(builder.withLogContext(any())).thenReturn(builder);
|
||||||
|
when(builder.build())
|
||||||
|
.thenReturn(coordinator0)
|
||||||
|
.thenReturn(coordinator1);
|
||||||
|
|
||||||
|
CompletableFuture<Void> future0 = new CompletableFuture<>();
|
||||||
|
when(loader.load(tp0, coordinator0)).thenReturn(future0);
|
||||||
|
|
||||||
|
CompletableFuture<Void> future1 = new CompletableFuture<>();
|
||||||
|
when(loader.load(tp1, coordinator1)).thenReturn(future1);
|
||||||
|
|
||||||
|
runtime.scheduleLoadOperation(tp0, 0);
|
||||||
|
runtime.scheduleLoadOperation(tp1, 0);
|
||||||
|
|
||||||
|
// Coordinator 0 is loaded. It should get the current image
|
||||||
|
// that is the empty one.
|
||||||
|
future0.complete(null);
|
||||||
|
verify(coordinator0).onLoaded(MetadataImage.EMPTY);
|
||||||
|
|
||||||
|
// Publish a new image.
|
||||||
|
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
|
||||||
|
MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
|
||||||
|
runtime.onNewMetadataImage(newImage, delta);
|
||||||
|
|
||||||
|
// Coordinator 0 should be notified about it.
|
||||||
|
verify(coordinator0).onNewMetadataImage(newImage, delta);
|
||||||
|
|
||||||
|
// Coordinator 1 is loaded. It should get the current image
|
||||||
|
// that is the new image.
|
||||||
|
future1.complete(null);
|
||||||
|
verify(coordinator1).onLoaded(newImage);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue