diff --git a/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java index 54b5690d74d..3f2d2ca2571 100644 --- a/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/GroupSpec.java @@ -35,11 +35,27 @@ public interface GroupSpec { SubscriptionType subscriptionType(); /** + * Determine whether a topic id and partition have been assigned to + * a member. This method functions the same for all types of groups. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partitionId Partition Id within topic. * @return True, if the partition is currently assigned to a member. * False, otherwise. */ boolean isPartitionAssigned(Uuid topicId, int partitionId); + /** + * For share groups, a partition can only be assigned once its initialization is complete. + * For other group types, this initialization is not required and all partitions returned + * by the SubscribedTopicDescriber are always assignable. + * + * @param topicId Uuid corresponding to the partition's topic. + * @param partitionId Partition Id within topic. + * @return True, if the partition is assignable. + */ + boolean isPartitionAssignable(Uuid topicId, int partitionId); + /** * Gets the member subscription specification for a member. * diff --git a/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscribedTopicDescriber.java b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscribedTopicDescriber.java index 8d229a3b988..2373162c2c9 100644 --- a/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscribedTopicDescriber.java +++ b/group-coordinator/group-coordinator-api/src/main/java/org/apache/kafka/coordinator/group/api/assignor/SubscribedTopicDescriber.java @@ -43,14 +43,4 @@ public interface SubscribedTopicDescriber { * If the topic Id does not exist, an empty set is returned. */ Set racksForPartition(Uuid topicId, int partition); - - /** - * Returns a set of partitions corresponding to a topic id which - * are allowlisted based on some criteria. For example, for share groups - * only partitions which have been initialized are returned. - * - * @param topicId The uuid of the topic - * @return The set of integers representing assignable partitions. Could be empty, or contain all partitions. - */ - Set assignablePartitions(Uuid topicId); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java index 5509055cb39..da04257edf6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java @@ -90,11 +90,11 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { // Subscribed topic partitions for the share group. List targetPartitions = computeTargetPartitions( - subscribedTopicIds, subscribedTopicDescriber); + groupSpec, subscribedTopicIds, subscribedTopicDescriber); // The current assignment from topic partition to members. Map> currentAssignment = currentAssignment(groupSpec); - return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, targetPartitions, currentAssignment, subscribedTopicDescriber); + return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, targetPartitions, currentAssignment); } private GroupAssignment assignHeterogeneous( @@ -109,13 +109,13 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { // Subscribed topic partitions for the share group member. List targetPartitions = computeTargetPartitions( - spec.subscribedTopicIds(), subscribedTopicDescriber); + groupSpec, spec.subscribedTopicIds(), subscribedTopicDescriber); memberToPartitionsSubscription.put(memberId, targetPartitions); } // The current assignment from topic partition to members. Map> currentAssignment = currentAssignment(groupSpec); - return newAssignmentHeterogeneous(groupSpec, memberToPartitionsSubscription, currentAssignment, subscribedTopicDescriber); + return newAssignmentHeterogeneous(groupSpec, memberToPartitionsSubscription, currentAssignment); } /** @@ -146,8 +146,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { GroupSpec groupSpec, Set subscribedTopicIds, List targetPartitions, - Map> currentAssignment, - SubscribedTopicDescriber subscribedTopicDescriber + Map> currentAssignment ) { // For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards. // That can be expressed as Math.ceil(numTargetPartitions / (double) numGroupMembers) @@ -196,7 +195,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { .filter(targetPartition -> !finalAssignmentByPartition.containsKey(targetPartition)) .toList(); - roundRobinAssignmentWithCount(groupSpec.memberIds(), unassignedPartitions, finalAssignment, desiredAssignmentCount, subscribedTopicDescriber); + roundRobinAssignmentWithCount(groupSpec.memberIds(), unassignedPartitions, finalAssignment, desiredAssignmentCount); return groupAssignment(finalAssignment, groupSpec.memberIds()); } @@ -211,8 +210,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { private GroupAssignment newAssignmentHeterogeneous( GroupSpec groupSpec, Map> memberToPartitionsSubscription, - Map> currentAssignment, - SubscribedTopicDescriber subscribedTopicDescriber + Map> currentAssignment ) { int numGroupMembers = groupSpec.memberIds().size(); @@ -240,7 +238,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { }); unassignedPartitions.keySet().forEach(unassignedTopic -> - roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), unassignedPartitions.get(unassignedTopic), newAssignment, subscribedTopicDescriber)); + roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), unassignedPartitions.get(unassignedTopic), newAssignment)); // Step 3: We combine current assignment and new assignment. Map> finalAssignment = newHashMap(numGroupMembers); @@ -288,14 +286,12 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { * @param partitionsToAssign The subscribed topic partitions which needs assignment. * @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this * method can be called multiple times for heterogeneous assignment. - * @param subscribedTopicDescriber The topic describer to fetch assignable partitions from. */ // Visible for testing void roundRobinAssignment( Collection memberIds, List partitionsToAssign, - Map> assignment, - SubscribedTopicDescriber subscribedTopicDescriber + Map> assignment ) { // We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions), // we again start from the starting index of memberIds. @@ -305,9 +301,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { memberIdIterator = memberIds.iterator(); } String memberId = memberIdIterator.next(); - if (subscribedTopicDescriber.assignablePartitions(topicPartition.topicId()).contains(topicPartition.partitionId())) { - assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); - } + assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId); } } @@ -320,14 +314,12 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { * @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance. * Note that this number can be exceeded by one to allow for situations * in which we have hashing collisions. - * @param subscribedTopicDescriber The topic describer to fetch assignable partitions from. */ void roundRobinAssignmentWithCount( Collection memberIds, List partitionsToAssign, Map> assignment, - int desiredAssignmentCount, - SubscribedTopicDescriber subscribedTopicDescriber + int desiredAssignmentCount ) { Collection memberIdsCopy = new LinkedHashSet<>(memberIds); @@ -337,9 +329,6 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { ListIterator partitionListIterator = partitionsToAssign.listIterator(); while (partitionListIterator.hasNext()) { TopicIdPartition partition = partitionListIterator.next(); - if (!subscribedTopicDescriber.assignablePartitions(partition.topicId()).contains(partition.partitionId())) { - continue; - } if (!memberIdIterator.hasNext()) { memberIdIterator = memberIdsCopy.iterator(); if (memberIdsCopy.isEmpty()) { @@ -360,6 +349,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { } private List computeTargetPartitions( + GroupSpec groupSpec, Set subscribedTopicIds, SubscribedTopicDescriber subscribedTopicDescriber ) { @@ -373,13 +363,11 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor { ); } - // Since we are returning a list here, we can keep it sorted - // to add determinism while testing and iterating. - targetPartitions.addAll(subscribedTopicDescriber.assignablePartitions(topicId).stream() - .sorted() - .map(partition -> new TopicIdPartition(topicId, partition)) - .toList() - ); + for (int partition = 0; partition < numPartitions; partition++) { + if (groupSpec.isPartitionAssignable(topicId, partition)) { + targetPartitions.add(new TopicIdPartition(topicId, partition)); + } + } }); return targetPartitions; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/GroupSpecImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/GroupSpecImpl.java index 5e7356bcfe6..33af7ee9978 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/GroupSpecImpl.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/GroupSpecImpl.java @@ -25,6 +25,8 @@ import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; import java.util.Collection; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.Set; /** * The assignment specification for a modern group. @@ -46,14 +48,31 @@ public class GroupSpecImpl implements GroupSpec { */ private final Map> invertedMemberAssignment; + /** + * In case of share groups, this map will be queried to decide + * which partition is assignable. For non-share groups, + * this optional should be empty. + */ + private final Optional>> topicPartitionAllowedMap; + public GroupSpecImpl( Map members, SubscriptionType subscriptionType, Map> invertedMemberAssignment + ) { + this(members, subscriptionType, invertedMemberAssignment, Optional.empty()); + } + + public GroupSpecImpl( + Map members, + SubscriptionType subscriptionType, + Map> invertedMemberAssignment, + Optional>> topicPartitionAllowedMap ) { this.members = Objects.requireNonNull(members); this.subscriptionType = Objects.requireNonNull(subscriptionType); this.invertedMemberAssignment = Objects.requireNonNull(invertedMemberAssignment); + this.topicPartitionAllowedMap = Objects.requireNonNull(topicPartitionAllowedMap); } /** @@ -84,6 +103,14 @@ public class GroupSpecImpl implements GroupSpec { return partitionMap.containsKey(partitionId); } + /** + * {@inheritDoc} + */ + @Override + public boolean isPartitionAssignable(Uuid topicId, int partitionId) { + return topicPartitionAllowedMap.map(allowedMap -> allowedMap.containsKey(topicId) && allowedMap.get(topicId).contains(partitionId)).orElse(true); + } + /** * {@inheritDoc} */ @@ -111,26 +138,25 @@ public class GroupSpecImpl implements GroupSpec { @Override public boolean equals(Object o) { if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - GroupSpecImpl that = (GroupSpecImpl) o; - return subscriptionType == that.subscriptionType && - members.equals(that.members) && - invertedMemberAssignment.equals(that.invertedMemberAssignment); + if (!(o instanceof GroupSpecImpl groupSpec)) return false; + return Objects.equals(members, groupSpec.members) && + subscriptionType == groupSpec.subscriptionType && + Objects.equals(invertedMemberAssignment, groupSpec.invertedMemberAssignment) && + Objects.equals(topicPartitionAllowedMap, groupSpec.topicPartitionAllowedMap); } @Override public int hashCode() { - int result = members.hashCode(); - result = 31 * result + subscriptionType.hashCode(); - result = 31 * result + invertedMemberAssignment.hashCode(); - return result; + return Objects.hash(members, subscriptionType, invertedMemberAssignment, topicPartitionAllowedMap); } @Override public String toString() { - return "GroupSpecImpl(members=" + members + + return "GroupSpecImpl(" + + "members=" + members + ", subscriptionType=" + subscriptionType + ", invertedMemberAssignment=" + invertedMemberAssignment + + ", topicPartitionAllowedMap=" + topicPartitionAllowedMap + ')'; } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java index d8237724b74..6be1bbfc99d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicDescriberImpl.java @@ -26,9 +26,7 @@ import org.apache.kafka.metadata.PartitionRegistration; import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.Set; /** @@ -36,27 +34,13 @@ import java.util.Set; * topic and partition metadata for the topics that the modern group is subscribed to. */ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber { - /** - * The map of topic Ids to the set of allowed partitions for each topic. - * If this is empty, all partitions are allowed. - */ - private final Optional>> topicPartitionAllowedMap; - /** * The metadata image that contains the latest metadata information. */ private final MetadataImage metadataImage; public SubscribedTopicDescriberImpl(MetadataImage metadataImage) { - this(metadataImage, Optional.empty()); - } - - public SubscribedTopicDescriberImpl( - MetadataImage metadataImage, - Optional>> topicPartitionAllowedMap - ) { this.metadataImage = Objects.requireNonNull(metadataImage); - this.topicPartitionAllowedMap = Objects.requireNonNull(topicPartitionAllowedMap); } /** @@ -100,50 +84,23 @@ public class SubscribedTopicDescriberImpl implements SubscribedTopicDescriber { return Set.of(); } - /** - * Returns a set of assignable partitions from the metadata image. - * If the allowed partition map is Optional.empty(), all the partitions in the corresponding - * topic image are returned for the argument topic id. If allowed map is empty, - * empty set is returned. - * - * @param topicId The uuid of the topic - * @return Set of integers if assignable partitions available, empty otherwise. - */ - @Override - public Set assignablePartitions(Uuid topicId) { - TopicImage topic = metadataImage.topics().getTopic(topicId); - if (topic == null) { - return Set.of(); - } - - if (topicPartitionAllowedMap.isEmpty()) { - return Collections.unmodifiableSet(topic.partitions().keySet()); - } - - return topicPartitionAllowedMap.get().getOrDefault(topicId, Set.of()); - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SubscribedTopicDescriberImpl that = (SubscribedTopicDescriberImpl) o; - if (!topicPartitionAllowedMap.equals(that.topicPartitionAllowedMap)) return false; return metadataImage.equals(that.metadataImage); } @Override public int hashCode() { - int result = metadataImage.hashCode(); - result = 31 * result + topicPartitionAllowedMap.hashCode(); - return result; + return Objects.hashCode(metadataImage); } @Override public String toString() { return "SubscribedTopicMetadata(" + "metadataImage=" + metadataImage + - ", topicPartitionAllowedMap=" + topicPartitionAllowedMap + ')'; } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java index ed1755e483e..a46f89de396 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/TargetAssignmentBuilder.java @@ -466,9 +466,10 @@ public abstract class TargetAssignmentBuilder members = new HashMap<>(); + + Set topicsSubscription = new LinkedHashSet<>(); + topicsSubscription.add(TOPIC_1_UUID); + topicsSubscription.add(TOPIC_3_UUID); + + members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + topicsSubscription, + Assignment.EMPTY + )); + + members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + topicsSubscription, + Assignment.EMPTY + )); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Map.of(), + Optional.of( + Map.of( + TOPIC_1_UUID, Set.of(0, 1, 2), + TOPIC_3_UUID, Set.of(0, 1) + ) + ) + ); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( + metadataImage + ); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. + // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment. + // Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by round-robin assignment. + // Step 3 -> no new assignment gets added by current assignment since it is empty. + Map>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 2), + mkTopicAssignment(TOPIC_3_UUID, 1) + )); + expectedAssignment.put(MEMBER_B, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 1), + mkTopicAssignment(TOPIC_3_UUID, 0) + )); + + // T1: 3 partitions + T3: 2 partitions = 5 partitions + assertEveryPartitionGetsAssignment(5, computedAssignment); + assertAssignment(expectedAssignment, computedAssignment); + } + + @Test + public void testAssignWithTwoMembersAndTwoTopicsHomogeneousWithNonAssignableTopic() { + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3) + .addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2) + .build(); + + Map members = new HashMap<>(); + + Set topicsSubscription = new LinkedHashSet<>(); + topicsSubscription.add(TOPIC_1_UUID); + topicsSubscription.add(TOPIC_3_UUID); + + members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + topicsSubscription, + Assignment.EMPTY + )); + + members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + topicsSubscription, + Assignment.EMPTY + )); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + Map.of(), + Optional.of( + Map.of(TOPIC_1_UUID, Set.of(0, 1, 2)) + ) + ); + + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( + metadataImage + ); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + Map>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 1, 2) + )); + expectedAssignment.put(MEMBER_B, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0) + )); + + // T1: 3 partitions + T3(non-assignable): 2 partitions = 3 partitions + assertEveryPartitionGetsAssignment(3, computedAssignment); + assertAssignment(expectedAssignment, computedAssignment); + } + @Test public void testAssignWithThreeMembersThreeTopicsHeterogeneous() { MetadataImage metadataImage = new MetadataImageBuilder() @@ -297,6 +417,159 @@ public class SimpleAssignorTest { assertAssignment(expectedAssignment, computedAssignment); } + @Test + public void testAssignWithThreeMembersThreeTopicsHeterogeneousWithAllowedMap() { + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3) + .addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 3) + .addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2) + .build(); + + Set memberATopicsSubscription = new LinkedHashSet<>(); + memberATopicsSubscription.add(TOPIC_1_UUID); + memberATopicsSubscription.add(TOPIC_2_UUID); + + Map members = new HashMap<>(); + members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + memberATopicsSubscription, + Assignment.EMPTY + )); + + members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + Set.of(TOPIC_3_UUID), + Assignment.EMPTY + )); + + Set memberCTopicsSubscription = new LinkedHashSet<>(); + memberCTopicsSubscription.add(TOPIC_2_UUID); + memberCTopicsSubscription.add(TOPIC_3_UUID); + members.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + memberCTopicsSubscription, + Assignment.EMPTY + )); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + Map.of(), + Optional.of( + Map.of( + TOPIC_1_UUID, Set.of(0, 1, 2), + TOPIC_2_UUID, Set.of(0, 1, 2), + TOPIC_3_UUID, Set.of(0, 1) + ) + ) + ); + + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( + metadataImage + ); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67. + // Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by hash assignment. + // Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, T2:1 -> member_C by round-robin assignment. + // Step 3 -> no new assignment gets added by current assignment since it is empty. + Map>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), + mkTopicAssignment(TOPIC_2_UUID, 0, 2) + )); + expectedAssignment.put(MEMBER_B, mkAssignment( + mkTopicAssignment(TOPIC_3_UUID, 0, 1) + )); + expectedAssignment.put(MEMBER_C, mkAssignment( + mkTopicAssignment(TOPIC_2_UUID, 1, 2) + )); + + // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 partitions + assertEveryPartitionGetsAssignment(8, computedAssignment); + assertAssignment(expectedAssignment, computedAssignment); + } + + @Test + public void testAssignWithThreeMembersThreeTopicsHeterogeneousWithNonAssignableTopic() { + MetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3) + .addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 3) + .addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2) // non-assignable + .build(); + + Set memberATopicsSubscription = new LinkedHashSet<>(); + memberATopicsSubscription.add(TOPIC_1_UUID); + memberATopicsSubscription.add(TOPIC_2_UUID); + + Map members = new HashMap<>(); + members.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + memberATopicsSubscription, + Assignment.EMPTY + )); + + members.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + Set.of(TOPIC_3_UUID), + Assignment.EMPTY + )); + + Set memberCTopicsSubscription = new LinkedHashSet<>(); + memberCTopicsSubscription.add(TOPIC_2_UUID); + memberCTopicsSubscription.add(TOPIC_3_UUID); + members.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + memberCTopicsSubscription, + Assignment.EMPTY + )); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + HETEROGENEOUS, + Map.of(), + Optional.of( + Map.of( + TOPIC_1_UUID, Set.of(0, 1, 2), + TOPIC_2_UUID, Set.of(0, 1, 2) + ) + ) + ); + + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl( + metadataImage + ); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + Map>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(MEMBER_A, mkAssignment( + mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2), + mkTopicAssignment(TOPIC_2_UUID, 0, 2) + )); + expectedAssignment.put(MEMBER_B, Map.of()); + expectedAssignment.put(MEMBER_C, mkAssignment( + mkTopicAssignment(TOPIC_2_UUID, 1) + )); + + // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions(non-assignable) = 6 partitions + assertEveryPartitionGetsAssignment(6, computedAssignment); + assertAssignment(expectedAssignment, computedAssignment); + } + @Test public void testAssignWithOneMemberNoAssignedTopicHeterogeneous() { MetadataImage metadataImage = new MetadataImageBuilder() @@ -390,13 +663,7 @@ public class SimpleAssignorTest { Map> assignment = new HashMap<>(); assignment.put(partition1, List.of(member1)); - SubscribedTopicDescriber describer = mock(SubscribedTopicDescriber.class); - when(describer.assignablePartitions(eq(TOPIC_1_UUID))).thenReturn(Set.of(0)); - when(describer.assignablePartitions(eq(TOPIC_2_UUID))).thenReturn(Set.of(0)); - when(describer.assignablePartitions(eq(TOPIC_3_UUID))).thenReturn(Set.of(0)); - when(describer.assignablePartitions(eq(TOPIC_4_UUID))).thenReturn(Set.of(0)); - - assignor.roundRobinAssignment(members, unassignedPartitions, assignment, describer); + assignor.roundRobinAssignment(members, unassignedPartitions, assignment); Map> expectedAssignment = Map.of( partition1, List.of(member1), partition2, List.of(member1), @@ -422,13 +689,7 @@ public class SimpleAssignorTest { assignment.put(member1, new HashSet<>(Set.of(partition1))); assignment.put(member2, new HashSet<>(Set.of(partition1))); - SubscribedTopicDescriber describer = mock(SubscribedTopicDescriber.class); - when(describer.assignablePartitions(eq(TOPIC_1_UUID))).thenReturn(Set.of(0)); - when(describer.assignablePartitions(eq(TOPIC_2_UUID))).thenReturn(Set.of(0)); - when(describer.assignablePartitions(eq(TOPIC_3_UUID))).thenReturn(Set.of(0)); - when(describer.assignablePartitions(eq(TOPIC_4_UUID))).thenReturn(Set.of(0)); - - assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2, describer); + assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2); Map> expectedAssignment = Map.of( member1, Set.of(partition1, partition2, partition4), member2, Set.of(partition1, partition3) @@ -454,14 +715,8 @@ public class SimpleAssignorTest { assignment.put(member1, new HashSet<>(Set.of(partition1))); assignment.put(member2, new HashSet<>(Set.of(partition1))); - SubscribedTopicDescriber describer = mock(SubscribedTopicDescriber.class); - when(describer.assignablePartitions(eq(TOPIC_1_UUID))).thenReturn(Set.of(0)); - when(describer.assignablePartitions(eq(TOPIC_2_UUID))).thenReturn(Set.of(0)); - when(describer.assignablePartitions(eq(TOPIC_3_UUID))).thenReturn(Set.of(0)); - when(describer.assignablePartitions(eq(TOPIC_4_UUID))).thenReturn(Set.of(0, 1, 2)); - assertThrows(PartitionAssignorException.class, - () -> assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2, describer)); + () -> assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2)); } @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java index b2ecfcbce78..b8868a30a7f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/SubscribedTopicMetadataTest.java @@ -23,8 +23,6 @@ import org.apache.kafka.image.MetadataImage; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Map; -import java.util.Optional; import java.util.Set; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -55,11 +53,6 @@ public class SubscribedTopicMetadataTest { assertThrows(NullPointerException.class, () -> new SubscribedTopicDescriberImpl(null)); } - @Test - public void testTopicPartitionAllowedMapCannotBeNull() { - assertThrows(NullPointerException.class, () -> new SubscribedTopicDescriberImpl(metadataImage, null)); - } - @Test public void testNumberOfPartitions() { Uuid topicId = Uuid.randomUuid(); @@ -100,32 +93,4 @@ public class SubscribedTopicMetadataTest { .build(); assertNotEquals(new SubscribedTopicDescriberImpl(metadataImage2), subscribedTopicMetadata); } - - @Test - public void testAssignablePartitions() { - String t1Name = "t1"; - Uuid t1Id = Uuid.randomUuid(); - metadataImage = new MetadataImageBuilder().addTopic(t1Id, t1Name, numPartitions).build(); - // Optional.empty() allow map (all partitions assignable) - subscribedTopicMetadata = new SubscribedTopicDescriberImpl(metadataImage, Optional.empty()); - assertEquals(Set.of(0, 1, 2, 3, 4), subscribedTopicMetadata.assignablePartitions(t1Id)); - - // empty allow map (nothing assignable) - subscribedTopicMetadata = new SubscribedTopicDescriberImpl(metadataImage, Optional.of(Map.of())); - assertEquals(Set.of(), subscribedTopicMetadata.assignablePartitions(t1Id)); - - // few assignable partitions - subscribedTopicMetadata = new SubscribedTopicDescriberImpl( - metadataImage, - Optional.of(Map.of(t1Id, Set.of(0, 5))) - ); - assertEquals(Set.of(0, 5), subscribedTopicMetadata.assignablePartitions(t1Id)); - - // all assignable partitions - subscribedTopicMetadata = new SubscribedTopicDescriberImpl( - metadataImage, - Optional.of(Map.of(t1Id, Set.of(0, 1, 2, 3, 4))) - ); - assertEquals(Set.of(0, 1, 2, 3, 4), subscribedTopicMetadata.assignablePartitions(t1Id)); - } }