diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 1de0e400fce..f69f6c8a33a 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -340,7 +340,7 @@ + files="(GroupMetadataManager|ConsumerGroupTest|ShareGroupTest|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GeneralUniformAssignmentBuilder).java"/> shareGroupHeartbeat( + RequestContext context, + ShareGroupHeartbeatRequestData request + ); + /** * Join a Classic Group. * @@ -174,6 +191,19 @@ public interface GroupCoordinator { List groupIds ); + /** + * Describe share groups. + * + * @param context The coordinator request context. + * @param groupIds The group ids. + * + * @return A future yielding the results or an exception. + */ + CompletableFuture> shareGroupDescribe( + RequestContext context, + List groupIds + ); + /** * Delete Groups. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 8d4845b23b6..764cccc1513 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -40,6 +40,9 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData; import org.apache.kafka.common.message.OffsetDeleteResponseData; import org.apache.kafka.common.message.OffsetFetchRequestData; import org.apache.kafka.common.message.OffsetFetchResponseData; +import org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup; +import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData; +import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -308,6 +311,18 @@ public class GroupCoordinatorService implements GroupCoordinator { )); } + /** + * See {@link GroupCoordinator#shareGroupHeartbeat(RequestContext, ShareGroupHeartbeatRequestData)}. + */ + @Override + public CompletableFuture shareGroupHeartbeat( + RequestContext context, + ShareGroupHeartbeatRequestData request + ) { + // TODO: Implement this method as part of KIP-932. + throw new UnsupportedOperationException(); + } + /** * See {@link GroupCoordinator#joinGroup(RequestContext, JoinGroupRequestData, BufferSupplier)}. */ @@ -589,6 +604,17 @@ public class GroupCoordinatorService implements GroupCoordinator { return FutureUtils.combineFutures(futures, ArrayList::new, List::addAll); } + /** + * See {@link GroupCoordinator#shareGroupDescribe(RequestContext, List)}. + */ + @Override + public CompletableFuture> shareGroupDescribe( + RequestContext context, + List groupIds) { + // TODO: Implement this method as part of KIP-932. + throw new UnsupportedOperationException(); + } + /** * See {@link GroupCoordinator#describeGroups(RequestContext, List)}. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java new file mode 100644 index 00000000000..4cc66468ae7 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; + +/** + * A simple partition assignor that assigns each member all partitions of the subscribed topics. + */ +public class SimpleAssignor implements ShareGroupPartitionAssignor { + + private static final String SIMPLE_ASSIGNOR_NAME = "simple"; + + @Override + public String name() { + return SIMPLE_ASSIGNOR_NAME; + } + + @Override + public GroupAssignment assign( + GroupSpec groupSpec, + SubscribedTopicDescriber subscribedTopicDescriber + ) throws PartitionAssignorException { + if (groupSpec.memberIds().isEmpty()) + return new GroupAssignment(Collections.emptyMap()); + + if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { + return assignHomogenous(groupSpec, subscribedTopicDescriber); + } else { + return assignHeterogeneous(groupSpec, subscribedTopicDescriber); + } + } + + private GroupAssignment assignHomogenous( + GroupSpec groupSpec, + SubscribedTopicDescriber subscribedTopicDescriber + ) { + Set subscribeTopicIds = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) + .subscribedTopicIds(); + if (subscribeTopicIds.isEmpty()) + return new GroupAssignment(Collections.emptyMap()); + + Map> targetPartitions = computeTargetPartitions( + subscribeTopicIds, subscribedTopicDescriber); + + return new GroupAssignment(groupSpec.memberIds().stream().collect(Collectors.toMap( + Function.identity(), memberId -> new MemberAssignmentImpl(targetPartitions)))); + } + + private GroupAssignment assignHeterogeneous( + GroupSpec groupSpec, + SubscribedTopicDescriber subscribedTopicDescriber + ) { + Map members = new HashMap<>(); + for (String memberId : groupSpec.memberIds()) { + MemberSubscription spec = groupSpec.memberSubscription(memberId); + if (spec.subscribedTopicIds().isEmpty()) + continue; + + Map> targetPartitions = computeTargetPartitions( + spec.subscribedTopicIds(), subscribedTopicDescriber); + + members.put(memberId, new MemberAssignmentImpl(targetPartitions)); + } + return new GroupAssignment(members); + } + + private Map> computeTargetPartitions( + Set subscribeTopicIds, + SubscribedTopicDescriber subscribedTopicDescriber + ) { + Map> targetPartitions = new HashMap<>(); + subscribeTopicIds.forEach(topicId -> { + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + if (numPartitions == -1) { + throw new PartitionAssignorException( + "Members are subscribed to topic " + topicId + + " which doesn't exist in the topic metadata." + ); + } + + Set partitions = new HashSet<>(); + for (int i = 0; i < numPartitions; i++) { + partitions.add(i); + } + targetPartitions.put(topicId, partitions); + }); + return targetPartitions; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java index 4f2114cc44e..537929b4a2a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java @@ -591,6 +591,73 @@ public abstract class ModernGroup implements Group return HOMOGENEOUS; } + /** + * Removes the partition epochs based on the provided assignment. + * + * @param assignment The assignment. + * @param expectedEpoch The expected epoch. + * @throws IllegalStateException if the epoch does not match the expected one. + * package-private for testing. + */ + public void removePartitionEpochs( + Map> assignment, + int expectedEpoch + ) { + assignment.forEach((topicId, assignedPartitions) -> { + currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { + if (partitionsOrNull != null) { + assignedPartitions.forEach(partitionId -> { + Integer prevValue = partitionsOrNull.remove(partitionId); + if (prevValue != expectedEpoch) { + throw new IllegalStateException( + String.format("Cannot remove the epoch %d from %s-%s because the partition is " + + "still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue)); + } + }); + if (partitionsOrNull.isEmpty()) { + return null; + } else { + return partitionsOrNull; + } + } else { + throw new IllegalStateException( + String.format("Cannot remove the epoch %d from %s because it does not have any epoch", + expectedEpoch, topicId)); + } + }); + }); + } + + /** + * Adds the partitions epoch based on the provided assignment. + * + * @param assignment The assignment. + * @param epoch The new epoch. + * @throws IllegalStateException if the partition already has an epoch assigned. + * package-private for testing. + */ + public void addPartitionEpochs( + Map> assignment, + int epoch + ) { + assignment.forEach((topicId, assignedPartitions) -> { + currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { + if (partitionsOrNull == null) { + partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedPartitions.size()); + } + for (Integer partitionId : assignedPartitions) { + Integer prevValue = partitionsOrNull.put(partitionId, epoch); + if (prevValue != null) { + throw new IllegalStateException( + String.format("Cannot set the epoch of %s-%s to %d because the partition is " + + "still owned at epoch %d", topicId, partitionId, epoch, prevValue)); + } + } + return partitionsOrNull; + }); + }); + } + /** * Gets the protocol type for the group. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java index 243dbef41c0..1b844d24ccd 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroupMember.java @@ -56,11 +56,6 @@ public abstract class ModernGroupMember { */ protected String rackId; - /** - * The rebalance timeout provided by the member. - */ - protected int rebalanceTimeoutMs; - /** * The client id reported by the member. */ @@ -87,7 +82,6 @@ public abstract class ModernGroupMember { int previousMemberEpoch, String instanceId, String rackId, - int rebalanceTimeoutMs, String clientId, String clientHost, Set subscribedTopicNames, @@ -100,7 +94,6 @@ public abstract class ModernGroupMember { this.state = state; this.instanceId = instanceId; this.rackId = rackId; - this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.clientId = clientId; this.clientHost = clientHost; this.subscribedTopicNames = subscribedTopicNames; @@ -142,13 +135,6 @@ public abstract class ModernGroupMember { return rackId; } - /** - * @return The rebalance timeout in millis. - */ - public int rebalanceTimeoutMs() { - return rebalanceTimeoutMs; - } - /** * @return The client id. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java index 2acceae609d..483e1c7e969 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java @@ -677,73 +677,6 @@ public class ConsumerGroup extends ModernGroup { } } - /** - * Removes the partition epochs based on the provided assignment. - * - * @param assignment The assignment. - * @param expectedEpoch The expected epoch. - * @throws IllegalStateException if the epoch does not match the expected one. - * package-private for testing. - */ - void removePartitionEpochs( - Map> assignment, - int expectedEpoch - ) { - assignment.forEach((topicId, assignedPartitions) -> { - currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { - if (partitionsOrNull != null) { - assignedPartitions.forEach(partitionId -> { - Integer prevValue = partitionsOrNull.remove(partitionId); - if (prevValue != expectedEpoch) { - throw new IllegalStateException( - String.format("Cannot remove the epoch %d from %s-%s because the partition is " + - "still owned at a different epoch %d", expectedEpoch, topicId, partitionId, prevValue)); - } - }); - if (partitionsOrNull.isEmpty()) { - return null; - } else { - return partitionsOrNull; - } - } else { - throw new IllegalStateException( - String.format("Cannot remove the epoch %d from %s because it does not have any epoch", - expectedEpoch, topicId)); - } - }); - }); - } - - /** - * Adds the partitions epoch based on the provided assignment. - * - * @param assignment The assignment. - * @param epoch The new epoch. - * @throws IllegalStateException if the partition already has an epoch assigned. - * package-private for testing. - */ - void addPartitionEpochs( - Map> assignment, - int epoch - ) { - assignment.forEach((topicId, assignedPartitions) -> { - currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> { - if (partitionsOrNull == null) { - partitionsOrNull = new TimelineHashMap<>(snapshotRegistry, assignedPartitions.size()); - } - for (Integer partitionId : assignedPartitions) { - Integer prevValue = partitionsOrNull.put(partitionId, epoch); - if (prevValue != null) { - throw new IllegalStateException( - String.format("Cannot set the epoch of %s-%s to %d because the partition is " + - "still owned at epoch %d", topicId, partitionId, epoch, prevValue)); - } - } - return partitionsOrNull; - }); - }); - } - public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup( long committedOffset, String defaultAssignor, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java index cc8d32f7f86..3b59d820a70 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java @@ -250,6 +250,10 @@ public class ConsumerGroupMember extends ModernGroupMember { } } + /** + * The rebalance timeout provided by the member. + */ + private int rebalanceTimeoutMs; /** * The subscription pattern configured by the member. @@ -294,19 +298,26 @@ public class ConsumerGroupMember extends ModernGroupMember { previousMemberEpoch, instanceId, rackId, - rebalanceTimeoutMs, clientId, clientHost, subscribedTopicNames, state, assignedPartitions ); + this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.subscribedTopicRegex = subscribedTopicRegex; this.serverAssignorName = serverAssignorName; this.partitionsPendingRevocation = partitionsPendingRevocation; this.classicMemberMetadata = classicMemberMetadata; } + /** + * @return The rebalance timeout in millis. + */ + public int rebalanceTimeoutMs() { + return rebalanceTimeoutMs; + } + /** * @return The regular expression based subscription. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java new file mode 100644 index 00000000000..2da7983aef6 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.modern.share; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ShareGroupDescribeResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.coordinator.group.CoordinatorRecord; +import org.apache.kafka.coordinator.group.CoordinatorRecordHelpers; +import org.apache.kafka.coordinator.group.OffsetExpirationCondition; +import org.apache.kafka.coordinator.group.modern.ModernGroup; +import org.apache.kafka.image.TopicsImage; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineObject; + +import java.util.List; +import java.util.Locale; +import java.util.Optional; +import java.util.Set; + +/** + * A Share Group. + */ +public class ShareGroup extends ModernGroup { + + private static final String PROTOCOL_TYPE = "share"; + + public enum ShareGroupState { + EMPTY("Empty"), + STABLE("Stable"), + DEAD("Dead"), + UNKNOWN("Unknown"); + + private final String name; + + private final String lowerCaseName; + + ShareGroupState(String name) { + this.name = name; + this.lowerCaseName = name.toLowerCase(Locale.ROOT); + } + + @Override + public String toString() { + return name; + } + + public String toLowerCaseString() { + return lowerCaseName; + } + } + + /** + * The group state. + */ + private final TimelineObject state; + + public ShareGroup( + SnapshotRegistry snapshotRegistry, + String groupId + ) { + super(snapshotRegistry, groupId); + this.state = new TimelineObject<>(snapshotRegistry, ShareGroupState.EMPTY); + } + + /** + * @return The group type (Share). + */ + @Override + public GroupType type() { + return GroupType.SHARE; + } + + /** + * @return The group protocol type (share). + */ + @Override + public String protocolType() { + return PROTOCOL_TYPE; + } + + /** + * @return The current state as a String. + */ + @Override + public String stateAsString() { + return state.get().toString(); + } + + /** + * @return The current state as a String with given committedOffset. + */ + public String stateAsString(long committedOffset) { + return state.get(committedOffset).toString(); + } + + /** + * @return The current state. + */ + public ShareGroupState state() { + return state.get(); + } + + /** + * @return The current state based on committed offset. + */ + public ShareGroupState state(long committedOffset) { + return state.get(committedOffset); + } + + /** + * Gets or creates a member. + * + * @param memberId The member id. + * @param createIfNotExists Booleans indicating whether the member must be + * created if it does not exist. + * + * @return A ShareGroupMember. + */ + public ShareGroupMember getOrMaybeCreateMember( + String memberId, + boolean createIfNotExists + ) { + ShareGroupMember member = members.get(memberId); + if (member != null) return member; + + if (!createIfNotExists) { + throw new UnknownMemberIdException( + String.format("Member %s is not a member of group %s.", memberId, groupId)); + } + + return new ShareGroupMember.Builder(memberId).build(); + } + + /** + * Updates the member. + * + * @param newMember The new share group member. + */ + @Override + public void updateMember(ShareGroupMember newMember) { + if (newMember == null) { + throw new IllegalArgumentException("newMember cannot be null."); + } + + ShareGroupMember oldMember = members.put(newMember.memberId(), newMember); + maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, newMember); + maybeUpdatePartitionEpoch(oldMember, newMember); + maybeUpdateGroupState(); + } + + /** + * Remove the member from the group. + * + * @param memberId The member id to remove. + */ + public void removeMember(String memberId) { + ShareGroupMember oldMember = members.remove(memberId); + maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(oldMember, null); + maybeRemovePartitionEpoch(oldMember); + maybeUpdateGroupState(); + } + + @Override + public void validateOffsetCommit( + String memberId, + String groupInstanceId, + int memberEpoch, + boolean isTransactional, + short apiVersion + ) { + throw new UnsupportedOperationException("validateOffsetCommit is not supported for Share Groups."); + } + + @Override + public void validateOffsetFetch( + String memberId, + int memberEpoch, + long lastCommittedOffset + ) { + throw new UnsupportedOperationException("validateOffsetFetch is not supported for Share Groups."); + } + + @Override + public void validateOffsetDelete() { + throw new UnsupportedOperationException("validateOffsetDelete is not supported for Share Groups."); + } + + /** + * Validates the DeleteGroups request. + */ + @Override + public void validateDeleteGroup() throws ApiException { + if (state() != ShareGroupState.EMPTY) { + throw Errors.NON_EMPTY_GROUP.exception(); + } + } + + /** + * Populates the list of records with tombstone(s) for deleting the group. + * + * @param records The list of records. + */ + @Override + public void createGroupTombstoneRecords(List records) { + records.add(CoordinatorRecordHelpers.newGroupEpochTombstoneRecord(groupId(), GroupType.SHARE)); + } + + @Override + public boolean isEmpty() { + return state() == ShareGroupState.EMPTY; + } + + @Override + public Optional offsetExpirationCondition() { + throw new UnsupportedOperationException("offsetExpirationCondition is not supported for Share Groups."); + } + + @Override + public boolean isInStates(final Set statesFilter, final long committedOffset) { + return statesFilter.contains(state.get(committedOffset).toLowerCaseString()); + } + + /** + * Updates the current state of the group. + */ + @Override + protected void maybeUpdateGroupState() { + ShareGroupState newState = ShareGroupState.STABLE; + if (members.isEmpty()) { + newState = ShareGroupState.EMPTY; + } + + state.set(newState); + } + + /** + * Updates the partition epochs based on the old and the new member. + * + * @param oldMember The old member. + * @param newMember The new member. + */ + private void maybeUpdatePartitionEpoch( + ShareGroupMember oldMember, + ShareGroupMember newMember + ) { + maybeRemovePartitionEpoch(oldMember); + addPartitionEpochs(newMember.assignedPartitions(), newMember.memberEpoch()); + } + + /** + * Removes the partition epochs for the provided member. + * + * @param oldMember The old member. + */ + private void maybeRemovePartitionEpoch( + ShareGroupMember oldMember + ) { + if (oldMember != null) { + removePartitionEpochs(oldMember.assignedPartitions(), oldMember.memberEpoch()); + } + } + + public ShareGroupDescribeResponseData.DescribedGroup asDescribedGroup( + long committedOffset, + String defaultAssignor, + TopicsImage topicsImage + ) { + ShareGroupDescribeResponseData.DescribedGroup describedGroup = new ShareGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setAssignorName(defaultAssignor) + .setGroupEpoch(groupEpoch.get(committedOffset)) + .setGroupState(state.get(committedOffset).toString()) + .setAssignmentEpoch(targetAssignmentEpoch.get(committedOffset)); + members.entrySet(committedOffset).forEach( + entry -> describedGroup.members().add( + entry.getValue().asShareGroupDescribeMember( + topicsImage + ) + ) + ); + return describedGroup; + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java new file mode 100644 index 00000000000..9f817f28cd3 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMember.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.modern.share; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ShareGroupDescribeResponseData; +import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; +import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.ModernGroupMember; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * ShareGroupMember contains all the information related to a member + * within a share group. This class is immutable. + */ +public class ShareGroupMember extends ModernGroupMember { + + /** + * A builder that facilitates the creation of a new member or the update of + * an existing one. + *

+ * Please refer to the javadoc of {{@link ShareGroupMember}} for the + * definition of the fields. + */ + public static class Builder { + private final String memberId; + private int memberEpoch = 0; + private int previousMemberEpoch = -1; + private MemberState state = MemberState.STABLE; + private String rackId = null; + private String clientId = ""; + private String clientHost = ""; + private Set subscribedTopicNames = Collections.emptySet(); + private Map> assignedPartitions = Collections.emptyMap(); + + public Builder(String memberId) { + this.memberId = Objects.requireNonNull(memberId); + } + + public Builder(ShareGroupMember member) { + this( + Objects.requireNonNull(member), + member.memberId + ); + } + + public Builder(ShareGroupMember member, String newMemberId) { + Objects.requireNonNull(member); + + this.memberId = Objects.requireNonNull(newMemberId); + this.memberEpoch = member.memberEpoch; + this.previousMemberEpoch = member.previousMemberEpoch; + this.rackId = member.rackId; + this.clientId = member.clientId; + this.clientHost = member.clientHost; + this.subscribedTopicNames = member.subscribedTopicNames; + this.assignedPartitions = member.assignedPartitions; + } + + public Builder setMemberEpoch(int memberEpoch) { + this.memberEpoch = memberEpoch; + return this; + } + + public Builder setPreviousMemberEpoch(int previousMemberEpoch) { + this.previousMemberEpoch = previousMemberEpoch; + return this; + } + + public Builder setRackId(String rackId) { + this.rackId = rackId; + return this; + } + + public Builder maybeUpdateRackId(Optional rackId) { + this.rackId = rackId.orElse(this.rackId); + return this; + } + + public Builder setClientId(String clientId) { + this.clientId = clientId; + return this; + } + + public Builder setClientHost(String clientHost) { + this.clientHost = clientHost; + return this; + } + + public Builder setSubscribedTopicNames(List subscribedTopicList) { + if (subscribedTopicNames != null) this.subscribedTopicNames = new HashSet<>(subscribedTopicList); + return this; + } + + public Builder maybeUpdateSubscribedTopicNames(Optional> subscribedTopicList) { + subscribedTopicList.ifPresent(list -> this.subscribedTopicNames = new HashSet<>(list)); + return this; + } + + public Builder setState(MemberState state) { + this.state = state; + return this; + } + + public Builder setAssignedPartitions(Map> assignedPartitions) { + this.assignedPartitions = assignedPartitions; + return this; + } + + public Builder updateWith(ShareGroupMemberMetadataValue record) { + setRackId(record.rackId()); + setClientId(record.clientId()); + setClientHost(record.clientHost()); + setSubscribedTopicNames(record.subscribedTopicNames()); + return this; + } + + public ShareGroupMember build() { + return new ShareGroupMember( + memberId, + memberEpoch, + previousMemberEpoch, + rackId, + clientId, + clientHost, + subscribedTopicNames, + state, + assignedPartitions + ); + } + } + + private ShareGroupMember( + String memberId, + int memberEpoch, + int previousMemberEpoch, + String rackId, + String clientId, + String clientHost, + Set subscribedTopicNames, + MemberState state, + Map> assignedPartitions + ) { + super( + memberId, + memberEpoch, + previousMemberEpoch, + null, + rackId, + clientId, + clientHost, + subscribedTopicNames, + state, + assignedPartitions + ); + } + + /** + * Converts this ShareGroupMember to a ShareGroupDescribeResponseData.Member. + * + * @param topicsImage: Topics image object to search for a specific topic id + * + * @return The ShareGroupMember mapped as ShareGroupDescribeResponseData.Member. + */ + public ShareGroupDescribeResponseData.Member asShareGroupDescribeMember( + TopicsImage topicsImage + ) { + return new ShareGroupDescribeResponseData.Member() + .setMemberEpoch(memberEpoch) + .setMemberId(memberId) + .setAssignment(new ShareGroupDescribeResponseData.Assignment() + .setTopicPartitions(topicPartitionsFromMap(assignedPartitions, topicsImage))) + .setClientHost(clientHost) + .setClientId(clientId) + .setRackId(rackId) + .setSubscribedTopicNames(subscribedTopicNames == null ? null : new ArrayList<>(subscribedTopicNames)); + } + + private static List topicPartitionsFromMap( + Map> partitions, + TopicsImage topicsImage + ) { + List topicPartitions = new ArrayList<>(); + partitions.forEach((topicId, partitionSet) -> { + TopicImage topicImage = topicsImage.getTopic(topicId); + if (topicImage != null) { + topicPartitions.add(new ShareGroupDescribeResponseData.TopicPartitions() + .setTopicId(topicId) + .setTopicName(topicImage.name()) + .setPartitions(new ArrayList<>(partitionSet))); + } + }); + return topicPartitions; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ShareGroupMember that = (ShareGroupMember) o; + return memberEpoch == that.memberEpoch + && previousMemberEpoch == that.previousMemberEpoch + && state == that.state + && Objects.equals(memberId, that.memberId) + && Objects.equals(rackId, that.rackId) + && Objects.equals(clientId, that.clientId) + && Objects.equals(clientHost, that.clientHost) + && Objects.equals(subscribedTopicNames, that.subscribedTopicNames) + && Objects.equals(assignedPartitions, that.assignedPartitions); + } + + @Override + public int hashCode() { + int result = memberId != null ? memberId.hashCode() : 0; + result = 31 * result + memberEpoch; + result = 31 * result + previousMemberEpoch; + result = 31 * result + Objects.hashCode(state); + result = 31 * result + Objects.hashCode(rackId); + result = 31 * result + Objects.hashCode(clientId); + result = 31 * result + Objects.hashCode(clientHost); + result = 31 * result + Objects.hashCode(subscribedTopicNames); + result = 31 * result + Objects.hashCode(assignedPartitions); + return result; + } + + @Override + public String toString() { + return "ShareGroupMember(" + + "memberId='" + memberId + '\'' + + ", memberEpoch=" + memberEpoch + '\'' + + ", previousMemberEpoch=" + previousMemberEpoch + '\'' + + ", state='" + state + '\'' + + ", rackId='" + rackId + '\'' + + ", clientId='" + clientId + '\'' + + ", clientHost='" + clientHost + '\'' + + ", subscribedTopicNames=" + subscribedTopicNames + '\'' + + ", assignedPartitions=" + assignedPartitions + + ')'; + } +} diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataKey.json new file mode 100644 index 00000000000..cbe1a8dd871 --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataKey.json @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// KIP-932 is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "ShareGroupMemberMetadataKey", + "validVersions": "10", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "10", + "about": "The group id." }, + { "name": "MemberId", "type": "string", "versions": "10", + "about": "The member id." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataValue.json new file mode 100644 index 00000000000..c5d4c7abd45 --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/ShareGroupMemberMetadataValue.json @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// KIP-932 is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "ShareGroupMemberMetadataValue", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string", + "about": "The (optional) rack id." }, + { "name": "ClientId", "versions": "0+", "type": "string", + "about": "The client id." }, + { "name": "ClientHost", "versions": "0+", "type": "string", + "about": "The client host." }, + { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string", + "about": "The list of subscribed topic names." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupMetadataKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupMetadataKey.json new file mode 100644 index 00000000000..309b67ba31f --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/ShareGroupMetadataKey.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// KIP-932 is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "ShareGroupMetadataKey", + "validVersions": "11", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "11", + "about": "The group id." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupMetadataValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupMetadataValue.json new file mode 100644 index 00000000000..02ca3eacd04 --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/ShareGroupMetadataValue.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// KIP-932 is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "ShareGroupMetadataValue", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "Epoch", "type": "int32", "versions": "0+", + "about": "The group epoch." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json b/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json new file mode 100644 index 00000000000..4b51978ecae --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataKey.json @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// KIP-932 is in development. This schema is subject to non-backwards-compatible changes. + +{ + "type": "data", + "name": "ShareGroupPartitionMetadataKey", + "validVersions": "9", + "flexibleVersions": "none", + "fields": [ + { "name": "GroupId", "type": "string", "versions": "9", + "about": "The group id." } + ] +} diff --git a/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json b/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json new file mode 100644 index 00000000000..4510a6ffe55 --- /dev/null +++ b/group-coordinator/src/main/resources/common/message/ShareGroupPartitionMetadataValue.json @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// KIP-932 is in development. This schema is subject to non-backwards-compatible changes. +{ + "type": "data", + "name": "ShareGroupPartitionMetadataValue", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "InitializedTopics", "versions": "0+", "type": "[]TopicPartitionsInfo", + "about": "The topics with initialized share-group state." }, + { "name": "DeletingTopics", "versions": "0+", "type": "[]TopicInfo", + "about": "The topics whose share-group state is being deleted." } + ], + "commonStructs": [ + { "name": "TopicPartitionsInfo", "versions": "0+", "fields": [ + { "name": "TopicId", "type": "uuid", "versions": "0+", + "about": "The topic identifier." }, + { "name": "TopicName", "type": "string", "versions": "0+", + "about": "The topic name." }, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partitions." } + ]}, + { "name": "TopicInfo", "versions": "0+", "fields": [ + { "name": "TopicId", "type": "uuid", "versions": "0+", + "about": "The topic identifier." }, + { "name": "TopicName", "type": "string", "versions": "0+", + "about": "The topic name." } + ]} + ] +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java index 2c0c0d555aa..431e9d90774 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.coordinator.group.Group.GroupType; import org.apache.kafka.coordinator.group.classic.ClassicGroup; import org.apache.kafka.coordinator.group.classic.ClassicGroupMember; import org.apache.kafka.coordinator.group.classic.ClassicGroupState; @@ -41,6 +42,7 @@ import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; import org.apache.kafka.coordinator.group.generated.OffsetCommitKey; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; +import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard; import org.apache.kafka.coordinator.group.modern.MemberState; import org.apache.kafka.coordinator.group.modern.TopicMetadata; @@ -292,6 +294,27 @@ public class CoordinatorRecordHelpersTest { )); } + @Test + public void testNewGroupEpochTombstoneRecordShareGroup() { + CoordinatorRecord expectedRecord = new CoordinatorRecord( + new ApiMessageAndVersion( + new ShareGroupMetadataKey() + .setGroupId("group-id"), + (short) 11), + null); + + assertEquals(expectedRecord, newGroupEpochTombstoneRecord( + "group-id", GroupType.SHARE + )); + } + + @Test + public void testNewGroupEpochTombstoneRecordUnknownGroup() { + assertThrows(IllegalArgumentException.class, () -> newGroupEpochTombstoneRecord( + "group-id", GroupType.UNKNOWN + )); + } + @Test public void testNewTargetAssignmentRecord() { Uuid topicId1 = Uuid.randomUuid(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java new file mode 100644 index 00000000000..f15bde38469 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.GroupSpecImpl; +import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl; +import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class SimpleAssignorTest { + + private static final Uuid TOPIC_1_UUID = Uuid.randomUuid(); + private static final Uuid TOPIC_2_UUID = Uuid.randomUuid(); + private static final Uuid TOPIC_3_UUID = Uuid.randomUuid(); + private static final String TOPIC_1_NAME = "topic1"; + private static final String TOPIC_3_NAME = "topic3"; + private static final String MEMBER_A = "A"; + private static final String MEMBER_B = "B"; + + private final SimpleAssignor assignor = new SimpleAssignor(); + + @Test + public void testName() { + assertEquals("simple", assignor.name()); + } + + @Test + public void testAssignWithEmptyMembers() { + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( + Collections.emptyMap() + ); + + GroupSpec groupSpec = new GroupSpecImpl( + Collections.emptyMap(), + HOMOGENEOUS, + Collections.emptyMap() + ); + + GroupAssignment groupAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + assertEquals(Collections.emptyMap(), groupAssignment.members()); + } + + @Test + public void testAssignWithNoSubscribedTopic() { + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( + Collections.singletonMap( + TOPIC_1_UUID, + new TopicMetadata( + TOPIC_1_UUID, + TOPIC_1_NAME, + 3, + Collections.emptyMap() + ) + ) + ); + + Map members = Collections.singletonMap( + MEMBER_A, + new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Collections.emptySet(), + Assignment.EMPTY + ) + ); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); + + GroupAssignment groupAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + assertEquals(Collections.emptyMap(), groupAssignment.members()); + } + + @Test + public void testAssignWithSubscribedToNonExistentTopic() { + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( + Collections.singletonMap( + TOPIC_1_UUID, + new TopicMetadata( + TOPIC_1_UUID, + TOPIC_1_NAME, + 3, + Collections.emptyMap() + ) + ) + ); + + Map members = Collections.singletonMap( + MEMBER_A, + new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + mkSet(TOPIC_2_UUID), + Assignment.EMPTY + ) + ); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); + + assertThrows(PartitionAssignorException.class, + () -> assignor.assign(groupSpec, subscribedTopicMetadata)); + } + + @Test + public void testAssignWithTwoMembersAndTwoTopicsHomogeneous() { + Map topicMetadata = new HashMap<>(); + topicMetadata.put(TOPIC_1_UUID, new TopicMetadata( + TOPIC_1_UUID, + TOPIC_1_NAME, + 3, + Collections.emptyMap() + )); + topicMetadata.put(TOPIC_3_UUID, new TopicMetadata( + TOPIC_3_UUID, + TOPIC_3_NAME, + 2, + Collections.emptyMap() + )); + + Map members = new TreeMap<>(); + + members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + mkSet(TOPIC_1_UUID, TOPIC_3_UUID), + Assignment.EMPTY + )); + + members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + mkSet(TOPIC_1_UUID, TOPIC_3_UUID), + Assignment.EMPTY + )); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Collections.emptyMap() + ); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + Map>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), + mkTopicAssignment(TOPIC_3_UUID, 0, 1) + )); + expectedAssignment.put(MEMBER_B, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), + mkTopicAssignment(TOPIC_3_UUID, 0, 1) + )); + + assertAssignment(expectedAssignment, computedAssignment); + } + + @Test + public void testAssignWithThreeMembersThreeTopicsHeterogeneous() { + Map topicMetadata = new HashMap<>(); + topicMetadata.put(TOPIC_1_UUID, new TopicMetadata( + TOPIC_1_UUID, + TOPIC_1_NAME, + 3, + Collections.emptyMap() + )); + + topicMetadata.put(TOPIC_2_UUID, new TopicMetadata( + TOPIC_2_UUID, + "topic2", + 3, + Collections.emptyMap() + )); + topicMetadata.put(TOPIC_3_UUID, new TopicMetadata( + TOPIC_3_UUID, + TOPIC_3_NAME, + 2, + Collections.emptyMap() + )); + + Map members = new TreeMap<>(); + members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + mkSet(TOPIC_1_UUID, TOPIC_2_UUID), + Assignment.EMPTY + )); + + members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + mkSet(TOPIC_3_UUID), + Assignment.EMPTY + )); + + String memberC = "C"; + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + mkSet(TOPIC_2_UUID, TOPIC_3_UUID), + Assignment.EMPTY + )); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + Collections.emptyMap() + ); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + Map>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), + mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2) + )); + expectedAssignment.put(MEMBER_B, mkAssignment( + mkTopicAssignment(TOPIC_3_UUID, 0, 1) + )); + expectedAssignment.put(memberC, mkAssignment( + mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2), + mkTopicAssignment(TOPIC_3_UUID, 0, 1) + )); + + assertAssignment(expectedAssignment, computedAssignment); + } + + @Test + public void testAssignWithOneMemberNoAssignedTopicHeterogeneous() { + Map topicMetadata = new HashMap<>(); + topicMetadata.put(TOPIC_1_UUID, new TopicMetadata( + TOPIC_1_UUID, + TOPIC_1_NAME, + 3, + Collections.emptyMap() + )); + + topicMetadata.put(TOPIC_2_UUID, new TopicMetadata( + TOPIC_2_UUID, + "topic2", + 2, + Collections.emptyMap() + )); + + Map members = new TreeMap<>(); + members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + mkSet(TOPIC_1_UUID, TOPIC_2_UUID), + Assignment.EMPTY + )); + + members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Collections.emptySet(), + Assignment.EMPTY + )); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + Collections.emptyMap() + ); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + Map>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), + mkTopicAssignment(TOPIC_2_UUID, 0, 1) + )); + + assertAssignment(expectedAssignment, computedAssignment); + } + + private void assertAssignment( + Map>> expectedAssignment, + GroupAssignment computedGroupAssignment + ) { + assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size()); + for (String memberId : computedGroupAssignment.members().keySet()) { + Map> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).partitions(); + assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); + } + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMemberTest.java new file mode 100644 index 00000000000..16a137cf823 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupMemberTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.modern.share; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ShareGroupDescribeResponseData; +import org.apache.kafka.coordinator.group.MetadataImageBuilder; +import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue; +import org.apache.kafka.image.MetadataImage; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class ShareGroupMemberTest { + + @Test + public void testNewMember() { + Uuid topicId1 = Uuid.randomUuid(); + + ShareGroupMember member = new ShareGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setRackId("rack-id") + .setClientId("client-id") + .setClientHost("hostname") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3))) + .build(); + + assertEquals("member-id", member.memberId()); + assertEquals(10, member.memberEpoch()); + assertEquals(9, member.previousMemberEpoch()); + assertNull(member.instanceId()); + assertEquals("rack-id", member.rackId()); + assertEquals("client-id", member.clientId()); + assertEquals("hostname", member.clientHost()); + assertEquals(mkSet("bar", "foo"), member.subscribedTopicNames()); + assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions()); + } + + @Test + public void testEquals() { + Uuid topicId1 = Uuid.randomUuid(); + + ShareGroupMember member1 = new ShareGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setRackId("rack-id") + .setClientId("client-id") + .setClientHost("hostname") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3))) + .build(); + + ShareGroupMember member2 = new ShareGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setRackId("rack-id") + .setClientId("client-id") + .setClientHost("hostname") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3))) + .build(); + + assertEquals(member1, member2); + } + + @Test + public void testUpdateMember() { + Uuid topicId1 = Uuid.randomUuid(); + + ShareGroupMember member = new ShareGroupMember.Builder("member-id") + .setMemberEpoch(10) + .setPreviousMemberEpoch(9) + .setRackId("rack-id") + .setClientId("client-id") + .setClientHost("hostname") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 1, 2, 3))) + .build(); + + // This is a no-op. + ShareGroupMember updatedMember = new ShareGroupMember.Builder(member) + .maybeUpdateRackId(Optional.empty()) + .maybeUpdateSubscribedTopicNames(Optional.empty()) + .build(); + + assertEquals(member, updatedMember); + + updatedMember = new ShareGroupMember.Builder(member) + .maybeUpdateRackId(Optional.of("new-rack-id")) + .maybeUpdateSubscribedTopicNames(Optional.of(Collections.singletonList("zar"))) + .build(); + + assertNull(member.instanceId()); + assertEquals("new-rack-id", updatedMember.rackId()); + // Names are sorted. + assertEquals(mkSet("zar"), updatedMember.subscribedTopicNames()); + } + + @Test + public void testUpdateWithShareGroupMemberMetadataValue() { + ShareGroupMemberMetadataValue record = new ShareGroupMemberMetadataValue() + .setClientId("client-id") + .setClientHost("host-id") + .setRackId("rack-id") + .setSubscribedTopicNames(Arrays.asList("foo", "bar")); + + ShareGroupMember member = new ShareGroupMember.Builder("member-id") + .updateWith(record) + .build(); + + assertNull(member.instanceId()); + assertEquals("rack-id", member.rackId()); + assertEquals("client-id", member.clientId()); + assertEquals("host-id", member.clientHost()); + assertEquals(mkSet("bar", "foo"), member.subscribedTopicNames()); + } + + @Test + public void testAsShareGroupDescribeMember() { + Uuid topicId1 = Uuid.randomUuid(); + Uuid topicId2 = Uuid.randomUuid(); + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(topicId1, "topic1", 3) + .addTopic(topicId2, "topic2", 3) + .build(); + List subscribedTopicNames = Arrays.asList("topic1", "topic2"); + List assignedPartitions = Arrays.asList(0, 1, 2); + int epoch = 10; + ShareGroupMemberMetadataValue record = new ShareGroupMemberMetadataValue() + .setClientId("client-id") + .setClientHost("host-id") + .setRackId("rack-id") + .setSubscribedTopicNames(subscribedTopicNames); + + String memberId = Uuid.randomUuid().toString(); + ShareGroupMember member = new ShareGroupMember.Builder(memberId) + .updateWith(record) + .setMemberEpoch(epoch) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(topicId1, 0, 1, 2))) + .build(); + + ShareGroupDescribeResponseData.Member actual = member.asShareGroupDescribeMember(metadataImage.topics()); + ShareGroupDescribeResponseData.Member expected = new ShareGroupDescribeResponseData.Member() + .setMemberId(memberId) + .setMemberEpoch(epoch) + .setClientId("client-id") + .setRackId("rack-id") + .setClientHost("host-id") + .setSubscribedTopicNames(subscribedTopicNames) + .setAssignment( + new ShareGroupDescribeResponseData.Assignment() + .setTopicPartitions(Collections.singletonList(new ShareGroupDescribeResponseData.TopicPartitions() + .setTopicId(topicId1) + .setTopicName("topic1") + .setPartitions(assignedPartitions) + )) + ); + + assertEquals(expected, actual); + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java new file mode 100644 index 00000000000..3310e080744 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java @@ -0,0 +1,841 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.modern.share; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.UnknownMemberIdException; +import org.apache.kafka.common.message.ShareGroupDescribeResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; +import org.apache.kafka.coordinator.group.Group; +import org.apache.kafka.coordinator.group.MetadataImageBuilder; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.MemberState; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.coordinator.group.modern.share.ShareGroup.ShareGroupState; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.timeline.SnapshotRegistry; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; +import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS; +import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ShareGroupTest { + + @Test + public void testType() { + ShareGroup shareGroup = createShareGroup("foo"); + assertEquals(Group.GroupType.SHARE, shareGroup.type()); + } + + @Test + public void testProtocolType() { + ShareGroup shareGroup = createShareGroup("foo"); + assertEquals("share", shareGroup.protocolType()); + } + + @Test + public void testGetOrCreateMember() { + ShareGroup shareGroup = createShareGroup("foo"); + ShareGroupMember member; + + // Create a member. + member = shareGroup.getOrMaybeCreateMember("member-id", true); + assertEquals("member-id", member.memberId()); + + // Add member to the group. + shareGroup.updateMember(member); + + // Get that member back. + member = shareGroup.getOrMaybeCreateMember("member-id", false); + assertEquals("member-id", member.memberId()); + + assertThrows(UnknownMemberIdException.class, () -> + shareGroup.getOrMaybeCreateMember("does-not-exist", false)); + } + + @Test + public void testUpdateMember() { + ShareGroup shareGroup = createShareGroup("foo"); + ShareGroupMember member; + + member = shareGroup.getOrMaybeCreateMember("member", true); + + member = new ShareGroupMember.Builder(member) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .build(); + + shareGroup.updateMember(member); + + assertEquals(member, shareGroup.getOrMaybeCreateMember("member", false)); + } + + @Test + public void testRemoveMember() { + ShareGroup shareGroup = createShareGroup("foo"); + + ShareGroupMember member = shareGroup.getOrMaybeCreateMember("member", true); + shareGroup.updateMember(member); + assertTrue(shareGroup.hasMember("member")); + + shareGroup.removeMember("member"); + assertFalse(shareGroup.hasMember("member")); + + } + + @Test + public void testUpdatingMemberUpdatesPartitionEpoch() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + ShareGroup shareGroup = createShareGroup("foo"); + ShareGroupMember member; + + member = new ShareGroupMember.Builder("member") + .setMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3))) + .build(); + + shareGroup.updateMember(member); + + assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 1)); + assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 2)); + assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 3)); + assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 4)); + assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 5)); + assertEquals(-1, shareGroup.currentPartitionEpoch(barTopicId, 6)); + + member = new ShareGroupMember.Builder(member) + .setMemberEpoch(11) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(barTopicId, 1, 2, 3))) + .build(); + + shareGroup.updateMember(member); + + assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 1)); + assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 2)); + assertEquals(11, shareGroup.currentPartitionEpoch(barTopicId, 3)); + assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 1)); + assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 2)); + assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 3)); + } + + @Test + public void testRemovePartitionEpochs() { + Uuid fooTopicId = Uuid.randomUuid(); + ShareGroup shareGroup = createShareGroup("foo"); + + // Removing should fail because there is no epoch set. + assertThrows(IllegalStateException.class, () -> shareGroup.removePartitionEpochs( + mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ), + 10 + )); + + ShareGroupMember m1 = new ShareGroupMember.Builder("m1") + .setMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1))) + .build(); + + shareGroup.updateMember(m1); + + // Removing should fail because the expected epoch is incorrect. + assertThrows(IllegalStateException.class, () -> shareGroup.removePartitionEpochs( + mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ), + 11 + )); + } + + @Test + public void testAddPartitionEpochs() { + Uuid fooTopicId = Uuid.randomUuid(); + ShareGroup shareGroup = createShareGroup("foo"); + + shareGroup.addPartitionEpochs( + mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ), + 10 + ); + + // Changing the epoch should fail because the owner of the partition + // should remove it first. + assertThrows(IllegalStateException.class, () -> shareGroup.addPartitionEpochs( + mkAssignment( + mkTopicAssignment(fooTopicId, 1) + ), + 11 + )); + } + + @Test + public void testDeletingMemberRemovesPartitionEpoch() { + Uuid fooTopicId = Uuid.randomUuid(); + + ShareGroup shareGroup = createShareGroup("foo"); + ShareGroupMember member; + + member = new ShareGroupMember.Builder("member") + .setMemberEpoch(10) + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 1, 2, 3))) + .build(); + + shareGroup.updateMember(member); + + assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 1)); + assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 2)); + assertEquals(10, shareGroup.currentPartitionEpoch(fooTopicId, 3)); + + shareGroup.removeMember(member.memberId()); + + assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 1)); + assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 2)); + assertEquals(-1, shareGroup.currentPartitionEpoch(fooTopicId, 3)); + } + + @Test + public void testGroupState() { + ShareGroup shareGroup = createShareGroup("foo"); + assertEquals(ShareGroup.ShareGroupState.EMPTY, shareGroup.state()); + assertEquals("Empty", shareGroup.stateAsString()); + + ShareGroupMember member1 = new ShareGroupMember.Builder("member1") + .setState(MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .build(); + + shareGroup.updateMember(member1); + shareGroup.setGroupEpoch(1); + + assertEquals(MemberState.STABLE, member1.state()); + assertEquals(ShareGroupState.STABLE, shareGroup.state()); + assertEquals("Stable", shareGroup.stateAsString()); + } + + @Test + public void testGroupTypeFromString() { + assertEquals(Group.GroupType.parse("share"), Group.GroupType.SHARE); + // Test case insensitivity. + assertEquals(Group.GroupType.parse("Share"), Group.GroupType.SHARE); + assertEquals(Group.GroupType.parse("SHare"), Group.GroupType.SHARE); + } + + @Test + public void testUpdateSubscriptionMetadata() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + Uuid zarTopicId = Uuid.randomUuid(); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(fooTopicId, "foo", 1) + .addTopic(barTopicId, "bar", 2) + .addTopic(zarTopicId, "zar", 3) + .addRacks() + .build(); + + ShareGroupMember member1 = new ShareGroupMember.Builder("member1") + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build(); + ShareGroupMember member2 = new ShareGroupMember.Builder("member2") + .setSubscribedTopicNames(Collections.singletonList("bar")) + .build(); + ShareGroupMember member3 = new ShareGroupMember.Builder("member3") + .setSubscribedTopicNames(Collections.singletonList("zar")) + .build(); + + ShareGroup shareGroup = createShareGroup("group-foo"); + + // It should be empty by default. + assertEquals( + Collections.emptyMap(), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(null, null), + image.topics(), + image.cluster() + ) + ); + + // Compute while taking into account member 1. + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(null, member1), + image.topics(), + image.cluster() + ) + ); + + // Updating the group with member1. + shareGroup.updateMember(member1); + + // It should return foo now. + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(null, null), + image.topics(), + image.cluster() + ) + ); + + // Compute while taking into account removal of member 1. + assertEquals( + Collections.emptyMap(), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(member1, null), + image.topics(), + image.cluster() + ) + ); + + // Compute while taking into account member 2. + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(null, member2), + image.topics(), + image.cluster() + ) + ); + + // Updating the group with member2. + shareGroup.updateMember(member2); + + // It should return foo and bar. + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(null, null), + image.topics(), + image.cluster() + ) + ); + + // Compute while taking into account removal of member 2. + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(member2, null), + image.topics(), + image.cluster() + ) + ); + + // Removing member1 results in returning bar. + assertEquals( + mkMap( + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(member1, null), + image.topics(), + image.cluster() + ) + ); + + // Compute while taking into account member 3. + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(null, member3), + image.topics(), + image.cluster() + ) + ); + + // Updating group with member3. + shareGroup.updateMember(member3); + + // It should return foo, bar and zar. + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(null, null), + image.topics(), + image.cluster() + ) + ); + + // Compute while taking into account removal of member 1, member 2 and member 3 + assertEquals( + Collections.emptyMap(), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(new HashSet<>(Arrays.asList(member1, member2, member3))), + image.topics(), + image.cluster() + ) + ); + + // Compute while taking into account removal of member 2 and member 3. + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(new HashSet<>(Arrays.asList(member2, member3))), + image.topics(), + image.cluster() + ) + ); + + // Compute while taking into account removal of member 1. + assertEquals( + mkMap( + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(Collections.singleton(member1)), + image.topics(), + image.cluster() + ) + ); + + // It should return foo, bar and zar. + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))), + mkEntry("zar", new TopicMetadata(zarTopicId, "zar", 3, mkMapOfPartitionRacks(3))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(Collections.emptySet()), + image.topics(), + image.cluster() + ) + ); + } + + @Test + public void testUpdateSubscribedTopicNamesAndSubscriptionType() { + ShareGroupMember member1 = new ShareGroupMember.Builder("member1") + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build(); + ShareGroupMember member2 = new ShareGroupMember.Builder("member2") + .setSubscribedTopicNames(Arrays.asList("bar", "foo")) + .build(); + ShareGroupMember member3 = new ShareGroupMember.Builder("member3") + .setSubscribedTopicNames(Arrays.asList("bar", "foo")) + .build(); + + ShareGroup shareGroup = createShareGroup("group-foo"); + + // It should be empty by default. + assertEquals( + Collections.emptyMap(), + shareGroup.subscribedTopicNames() + ); + + // It should be Homogeneous by default. + assertEquals( + HOMOGENEOUS, + shareGroup.subscriptionType() + ); + + shareGroup.updateMember(member1); + + // It should be Homogeneous since there is just 1 member + assertEquals( + HOMOGENEOUS, + shareGroup.subscriptionType() + ); + + shareGroup.updateMember(member2); + + assertEquals( + HETEROGENEOUS, + shareGroup.subscriptionType() + ); + + shareGroup.updateMember(member3); + + assertEquals( + HETEROGENEOUS, + shareGroup.subscriptionType() + ); + + shareGroup.removeMember(member1.memberId()); + + assertEquals( + HOMOGENEOUS, + shareGroup.subscriptionType() + ); + + ShareGroupMember member4 = new ShareGroupMember.Builder("member2") + .setSubscribedTopicNames(Arrays.asList("bar", "foo", "zar")) + .build(); + + shareGroup.updateMember(member4); + + assertEquals( + HETEROGENEOUS, + shareGroup.subscriptionType() + ); + } + + @Test + public void testUpdateInvertedAssignment() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "test-group"); + Uuid topicId = Uuid.randomUuid(); + String memberId1 = "member1"; + String memberId2 = "member2"; + + // Initial assignment for member1 + Assignment initialAssignment = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(0)) + )); + shareGroup.updateTargetAssignment(memberId1, initialAssignment); + + // Verify that partition 0 is assigned to member1. + assertEquals( + mkMap( + mkEntry(topicId, mkMap(mkEntry(0, memberId1))) + ), + shareGroup.invertedTargetAssignment() + ); + + // New assignment for member1 + Assignment newAssignment = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(1)) + )); + shareGroup.updateTargetAssignment(memberId1, newAssignment); + + // Verify that partition 0 is no longer assigned and partition 1 is assigned to member1 + assertEquals( + mkMap( + mkEntry(topicId, mkMap(mkEntry(1, memberId1))) + ), + shareGroup.invertedTargetAssignment() + ); + + // New assignment for member2 to add partition 1 + Assignment newAssignment2 = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(1)) + )); + shareGroup.updateTargetAssignment(memberId2, newAssignment2); + + // Verify that partition 1 is assigned to member2 + assertEquals( + mkMap( + mkEntry(topicId, mkMap(mkEntry(1, memberId2))) + ), + shareGroup.invertedTargetAssignment() + ); + + // New assignment for member1 to revoke partition 1 and assign partition 0 + Assignment newAssignment1 = new Assignment(Collections.singletonMap( + topicId, + new HashSet<>(Collections.singletonList(0)) + )); + shareGroup.updateTargetAssignment(memberId1, newAssignment1); + + // Verify that partition 1 is still assigned to member2 and partition 0 is assigned to member1 + assertEquals( + mkMap( + mkEntry(topicId, mkMap( + mkEntry(0, memberId1), + mkEntry(1, memberId2) + )) + ), + shareGroup.invertedTargetAssignment() + ); + + // Test remove target assignment for member1 + shareGroup.removeTargetAssignment(memberId1); + + // Verify that partition 0 is no longer assigned and partition 1 is still assigned to member2 + assertEquals( + mkMap( + mkEntry(topicId, mkMap(mkEntry(1, memberId2))) + ), + shareGroup.invertedTargetAssignment() + ); + } + + @Test + public void testMetadataRefreshDeadline() { + MockTime time = new MockTime(); + ShareGroup shareGroup = createShareGroup("group-foo"); + + // Group epoch starts at 0. + assertEquals(0, shareGroup.groupEpoch()); + + // The refresh time deadline should be empty when the group is created or loaded. + assertTrue(shareGroup.hasMetadataExpired(time.milliseconds())); + assertEquals(0L, shareGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(0, shareGroup.metadataRefreshDeadline().epoch); + + // Set the refresh deadline. The metadata remains valid because the deadline + // has not past and the group epoch is correct. + shareGroup.setMetadataRefreshDeadline(time.milliseconds() + 1000, shareGroup.groupEpoch()); + assertFalse(shareGroup.hasMetadataExpired(time.milliseconds())); + assertEquals(time.milliseconds() + 1000, shareGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(shareGroup.groupEpoch(), shareGroup.metadataRefreshDeadline().epoch); + + // Advance past the deadline. The metadata should have expired. + time.sleep(1001L); + assertTrue(shareGroup.hasMetadataExpired(time.milliseconds())); + + // Set the refresh time deadline with a higher group epoch. The metadata is considered + // as expired because the group epoch attached to the deadline is higher than the + // current group epoch. + shareGroup.setMetadataRefreshDeadline(time.milliseconds() + 1000, shareGroup.groupEpoch() + 1); + assertTrue(shareGroup.hasMetadataExpired(time.milliseconds())); + assertEquals(time.milliseconds() + 1000, shareGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(shareGroup.groupEpoch() + 1, shareGroup.metadataRefreshDeadline().epoch); + + // Advance the group epoch. + shareGroup.setGroupEpoch(shareGroup.groupEpoch() + 1); + + // Set the refresh deadline. The metadata remains valid because the deadline + // has not past and the group epoch is correct. + shareGroup.setMetadataRefreshDeadline(time.milliseconds() + 1000, shareGroup.groupEpoch()); + assertFalse(shareGroup.hasMetadataExpired(time.milliseconds())); + assertEquals(time.milliseconds() + 1000, shareGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(shareGroup.groupEpoch(), shareGroup.metadataRefreshDeadline().epoch); + + // Request metadata refresh. The metadata expires immediately. + shareGroup.requestMetadataRefresh(); + assertTrue(shareGroup.hasMetadataExpired(time.milliseconds())); + assertEquals(0L, shareGroup.metadataRefreshDeadline().deadlineMs); + assertEquals(0, shareGroup.metadataRefreshDeadline().epoch); + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) + public void testValidateOffsetCommit(short version) { + ShareGroup shareGroup = createShareGroup("group-foo"); + assertThrows(UnsupportedOperationException.class, () -> + shareGroup.validateOffsetCommit(null, null, -1, false, version)); + } + + @Test + public void testAsListedGroup() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo"); + snapshotRegistry.getOrCreateSnapshot(0); + assertEquals(ShareGroupState.EMPTY, shareGroup.state(0)); + assertEquals("Empty", shareGroup.stateAsString(0)); + shareGroup.updateMember(new ShareGroupMember.Builder("member1") + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build()); + snapshotRegistry.getOrCreateSnapshot(1); + assertEquals(ShareGroupState.EMPTY, shareGroup.state(0)); + assertEquals("Empty", shareGroup.stateAsString(0)); + assertEquals(ShareGroupState.STABLE, shareGroup.state(1)); + assertEquals("Stable", shareGroup.stateAsString(1)); + } + + @Test + public void testOffsetExpirationCondition() { + ShareGroup shareGroup = createShareGroup("group-foo"); + assertThrows(UnsupportedOperationException.class, shareGroup::offsetExpirationCondition); + } + + @Test + public void testValidateOffsetFetch() { + ShareGroup shareGroup = createShareGroup("group-foo"); + assertThrows(UnsupportedOperationException.class, () -> + shareGroup.validateOffsetFetch(null, -1, -1)); + } + + @Test + public void testValidateOffsetDelete() { + ShareGroup shareGroup = createShareGroup("group-foo"); + assertThrows(UnsupportedOperationException.class, shareGroup::validateOffsetDelete); + } + + @Test + public void testValidateDeleteGroup() { + ShareGroup shareGroup = createShareGroup("foo"); + + assertEquals(ShareGroupState.EMPTY, shareGroup.state()); + assertDoesNotThrow(shareGroup::validateDeleteGroup); + + ShareGroupMember member1 = new ShareGroupMember.Builder("member1") + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .build(); + shareGroup.updateMember(member1); + + assertEquals(ShareGroupState.STABLE, shareGroup.state()); + assertThrows(GroupNotEmptyException.class, shareGroup::validateDeleteGroup); + + shareGroup.setGroupEpoch(1); + + assertEquals(ShareGroupState.STABLE, shareGroup.state()); + assertThrows(GroupNotEmptyException.class, shareGroup::validateDeleteGroup); + + shareGroup.setTargetAssignmentEpoch(1); + + assertEquals(ShareGroupState.STABLE, shareGroup.state()); + assertThrows(GroupNotEmptyException.class, shareGroup::validateDeleteGroup); + } + + @Test + public void testIsSubscribedToTopic() { + Uuid fooTopicId = Uuid.randomUuid(); + Uuid barTopicId = Uuid.randomUuid(); + + MetadataImage image = new MetadataImageBuilder() + .addTopic(fooTopicId, "foo", 1) + .addTopic(barTopicId, "bar", 2) + .addRacks() + .build(); + + ShareGroupMember member1 = new ShareGroupMember.Builder("member1") + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build(); + ShareGroupMember member2 = new ShareGroupMember.Builder("member2") + .setSubscribedTopicNames(Collections.singletonList("bar")) + .build(); + + ShareGroup shareGroup = createShareGroup("group-foo"); + + shareGroup.updateMember(member1); + shareGroup.updateMember(member2); + + assertEquals( + mkMap( + mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1, mkMapOfPartitionRacks(1))), + mkEntry("bar", new TopicMetadata(barTopicId, "bar", 2, mkMapOfPartitionRacks(2))) + ), + shareGroup.computeSubscriptionMetadata( + shareGroup.computeSubscribedTopicNames(null, null), + image.topics(), + image.cluster() + ) + ); + + assertTrue(shareGroup.isSubscribedToTopic("foo")); + assertTrue(shareGroup.isSubscribedToTopic("bar")); + + shareGroup.removeMember("member1"); + assertFalse(shareGroup.isSubscribedToTopic("foo")); + + shareGroup.removeMember("member2"); + assertFalse(shareGroup.isSubscribedToTopic("bar")); + } + + @Test + public void testAsDescribedGroup() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-id-1"); + snapshotRegistry.getOrCreateSnapshot(0); + assertEquals(ShareGroupState.EMPTY.toString(), shareGroup.stateAsString(0)); + + shareGroup.updateMember(new ShareGroupMember.Builder("member1") + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build()); + shareGroup.updateMember(new ShareGroupMember.Builder("member2") + .build()); + snapshotRegistry.getOrCreateSnapshot(1); + + ShareGroupDescribeResponseData.DescribedGroup expected = new ShareGroupDescribeResponseData.DescribedGroup() + .setGroupId("group-id-1") + .setGroupState(ShareGroupState.STABLE.toString()) + .setGroupEpoch(0) + .setAssignmentEpoch(0) + .setAssignorName("assignorName") + .setMembers(Arrays.asList( + new ShareGroupDescribeResponseData.Member() + .setMemberId("member1") + .setSubscribedTopicNames(Collections.singletonList("foo")), + new ShareGroupDescribeResponseData.Member().setMemberId("member2") + )); + ShareGroupDescribeResponseData.DescribedGroup actual = shareGroup.asDescribedGroup(1, "assignorName", + new MetadataImageBuilder().build().topics()); + + assertEquals(expected, actual); + } + + @Test + public void testIsInStatesCaseInsensitive() { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + ShareGroup shareGroup = new ShareGroup(snapshotRegistry, "group-foo"); + snapshotRegistry.getOrCreateSnapshot(0); + assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0)); + assertFalse(shareGroup.isInStates(Collections.singleton("Empty"), 0)); + + shareGroup.updateMember(new ShareGroupMember.Builder("member1") + .setSubscribedTopicNames(Collections.singletonList("foo")) + .build()); + snapshotRegistry.getOrCreateSnapshot(1); + assertTrue(shareGroup.isInStates(Collections.singleton("empty"), 0)); + assertTrue(shareGroup.isInStates(Collections.singleton("stable"), 1)); + assertFalse(shareGroup.isInStates(Collections.singleton("empty"), 1)); + } + + private ShareGroup createShareGroup(String groupId) { + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + return new ShareGroup( + snapshotRegistry, + groupId + ); + } +}