KAFKA-19362: Finalize homogeneous simple share assignor (#19977)

Finalise the share group SimpleAssignor for homogeneous subscriptions.
The assignor code is much more accurate about the number of partitions
assigned to each member, and the number of members assigned for each
partition. It eliminates the idea of hash-based assignment because that
has been shown to the unhelpful. The revised code is very much more
effective at assigning evenly as the number of members grows and shrinks
over time.

A future PR will address the code for heterogeneous subscriptions.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Andrew Schofield 2025-06-20 16:10:47 +01:00 committed by GitHub
parent a797d644bc
commit 4690527fab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 575 additions and 425 deletions

View File

@ -17,10 +17,16 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.server.common.TopicIdPartition;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -51,4 +57,54 @@ public final class AssignorHelpers {
}
return copy;
}
/**
* Computes the list of target partitions which can be assigned to members. This list includes all partitions
* for the subscribed topic IDs, with the additional check that they must be assignable.
* @param groupSpec The assignment spec which includes member metadata.
* @param subscribedTopicIds The set of subscribed topic IDs.
* @param subscribedTopicDescriber The topic and partition metadata describer.
* @return The list of target partitions.
*/
static List<TopicIdPartition> computeTargetPartitions(
GroupSpec groupSpec,
Set<Uuid> subscribedTopicIds,
SubscribedTopicDescriber subscribedTopicDescriber
) {
List<TopicIdPartition> targetPartitions = new ArrayList<>();
subscribedTopicIds.forEach(topicId -> {
int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
if (numPartitions == -1) {
throw new PartitionAssignorException(
"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."
);
}
for (int partition = 0; partition < numPartitions; partition++) {
if (groupSpec.isPartitionAssignable(topicId, partition)) {
targetPartitions.add(new TopicIdPartition(topicId, partition));
}
}
});
return targetPartitions;
}
/**
* Constructs a HashNap with a known capacity. This is equivalent to HashMap.newHashMap which is introduced in Java 19.
* @param numMappings The expected number of mappings.
* @return The newly created map.
*/
static <K, V> HashMap<K, V> newHashMap(int numMappings) {
return new HashMap<>((int) (((numMappings + 1) / 0.75f) + 1));
}
/**
* Constructs a HashSet with a known capacity. This is equivalent to HashSet.newHashSet which is introduced in Java 19.
* @param numElements The expected number of elements.
* @return The newly created set.
*/
static <K> HashSet<K> newHashSet(int numElements) {
return new HashSet<>((int) (((numElements + 1) / 0.75f) + 1));
}
}

View File

@ -27,6 +27,9 @@ import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.server.common.TopicIdPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -34,7 +37,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
@ -42,7 +44,7 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
/**
* A simple partition assignor for share groups that assigns partitions of the subscribed topics
* based on the rules defined in KIP-932 to different members. It is not rack-aware.
* to different members based on the rules defined in KIP-932. It is not rack-aware.
* <p>
* Assignments are done according to the following principles:
* <ol>
@ -56,51 +58,39 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
* Balance is prioritized above stickiness.
*/
public class SimpleAssignor implements ShareGroupPartitionAssignor {
private static final Logger log = LoggerFactory.getLogger(SimpleAssignor.class);
private static final String SIMPLE_ASSIGNOR_NAME = "simple";
/**
* Unique name for this assignor.
*/
@Override
public String name() {
return SIMPLE_ASSIGNOR_NAME;
}
/**
* Assigns partitions to group members based on the given assignment specification and topic metadata.
*
* @param groupSpec The assignment spec which includes member metadata.
* @param subscribedTopicDescriber The topic and partition metadata describer.
* @return The new assignment for the group.
*/
@Override
public GroupAssignment assign(
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
if (groupSpec.memberIds().isEmpty())
return new GroupAssignment(Map.of());
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
return assignHomogeneous(groupSpec, subscribedTopicDescriber);
log.debug("Detected that all members are subscribed to the same set of topics, invoking the homogeneous assignment algorithm");
return new SimpleHomogeneousAssignmentBuilder(groupSpec, subscribedTopicDescriber).build();
} else {
log.debug("Detected that the members are subscribed to different sets of topics, invoking the heterogeneous assignment algorithm");
return assignHeterogeneous(groupSpec, subscribedTopicDescriber);
}
}
private GroupAssignment assignHomogeneous(
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) {
Set<Uuid> subscribedTopicIds = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
.subscribedTopicIds();
if (subscribedTopicIds.isEmpty())
return new GroupAssignment(Map.of());
// Subscribed topic partitions for the share group.
List<TopicIdPartition> targetPartitions = computeTargetPartitions(
groupSpec, subscribedTopicIds, subscribedTopicDescriber);
// The current assignment from topic partition to members.
Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec);
return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, targetPartitions, currentAssignment);
}
private GroupAssignment assignHeterogeneous(
GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber
) {
private GroupAssignment assignHeterogeneous(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
Map<String, List<TopicIdPartition>> memberToPartitionsSubscription = new HashMap<>();
for (String memberId : groupSpec.memberIds()) {
MemberSubscription spec = groupSpec.memberSubscription(memberId);
@ -108,22 +98,23 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
continue;
// Subscribed topic partitions for the share group member.
List<TopicIdPartition> targetPartitions = computeTargetPartitions(
groupSpec, spec.subscribedTopicIds(), subscribedTopicDescriber);
List<TopicIdPartition> targetPartitions = AssignorHelpers.computeTargetPartitions(groupSpec, spec.subscribedTopicIds(), subscribedTopicDescriber);
memberToPartitionsSubscription.put(memberId, targetPartitions);
}
// The current assignment from topic partition to members.
Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec);
return newAssignmentHeterogeneous(groupSpec, memberToPartitionsSubscription, currentAssignment);
}
/**
* Get the current assignment by topic partitions.
*
* @param groupSpec The group metadata specifications.
* @return the current assignment for subscribed topic partitions to memberIds.
*/
private Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec groupSpec) {
static Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec groupSpec) {
Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
for (String member : groupSpec.memberIds()) {
@ -131,80 +122,16 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
assignedTopicPartitions.forEach((topicId, partitions) -> partitions.forEach(
partition -> assignment.computeIfAbsent(new TopicIdPartition(topicId, partition), k -> new ArrayList<>()).add(member)));
}
return assignment;
}
/**
* This function computes the new assignment for a homogeneous group.
* @param groupSpec The group metadata specifications.
* @param subscribedTopicIds The set of all the subscribed topic ids for the group.
* @param targetPartitions The list of all topic partitions that need assignment.
* @param currentAssignment The current assignment for subscribed topic partitions to memberIds.
* @return the new partition assignment for the members of the group.
*/
private GroupAssignment newAssignmentHomogeneous(
GroupSpec groupSpec,
Set<Uuid> subscribedTopicIds,
List<TopicIdPartition> targetPartitions,
Map<TopicIdPartition, List<String>> currentAssignment
) {
// For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
// That can be expressed as Math.ceil(numTargetPartitions / (double) numGroupMembers)
// Using integer arithmetic, as (numTargetPartitions + numGroupMembers - 1) / numGroupMembers
int numGroupMembers = groupSpec.memberIds().size();
int numTargetPartitions = targetPartitions.size();
int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 1) / numGroupMembers;
Map<TopicIdPartition, List<String>> newAssignment = newHashMap(numTargetPartitions);
// Hash member IDs to topic partitions. Each member will be assigned one partition, but some partitions
// might have been assigned to more than one member.
memberHashAssignment(groupSpec.memberIds(), targetPartitions, newAssignment);
// Combine current and new hashed assignments, sized to accommodate the expected number of mappings.
Map<String, Set<TopicIdPartition>> finalAssignment = newHashMap(numGroupMembers);
Map<TopicIdPartition, Set<String>> finalAssignmentByPartition = newHashMap(numTargetPartitions);
// First, take the members assigned by hashing.
newAssignment.forEach((targetPartition, members) -> members.forEach(member -> {
finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition);
finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new HashSet<>()).add(member);
}));
// Then, take the members from the current assignment, making sure that no member has too many assigned partitions.
// When combining current assignment, we need to only consider the topics in current assignment that are also being
// subscribed in the new assignment as well.
currentAssignment.forEach((targetPartition, members) -> {
if (subscribedTopicIds.contains(targetPartition.topicId())) {
members.forEach(member -> {
if (groupSpec.memberIds().contains(member)) {
Set<TopicIdPartition> memberPartitions = finalAssignment.computeIfAbsent(member, k -> new HashSet<>());
if ((memberPartitions.size() < desiredAssignmentCount) && !newAssignment.containsKey(targetPartition)) {
memberPartitions.add(targetPartition);
finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new HashSet<>()).add(member);
}
}
});
}
});
// Finally, round-robin assignment for unassigned partitions which do not already have members assigned.
// The order of steps differs slightly from KIP-932 because the desired assignment count has been taken into
// account when copying partitions across from the current assignment, and this is more convenient.
List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
.filter(targetPartition -> !finalAssignmentByPartition.containsKey(targetPartition))
.toList();
roundRobinAssignmentWithCount(groupSpec.memberIds(), unassignedPartitions, finalAssignment, desiredAssignmentCount);
return groupAssignment(finalAssignment, groupSpec.memberIds());
}
/**
* This function computes the new assignment for a heterogeneous group.
* @param groupSpec The group metadata specifications.
* @param memberToPartitionsSubscription The member to subscribed topic partitions map.
* @param currentAssignment The current assignment for subscribed topic partitions to memberIds.
*
* @param groupSpec The group metadata specifications.
* @param memberToPartitionsSubscription The member to subscribed topic partitions map.
* @param currentAssignment The current assignment for subscribed topic partitions to memberIds.
* @return the new partition assignment for the members of the group.
*/
private GroupAssignment newAssignmentHeterogeneous(
@ -241,7 +168,7 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), unassignedPartitions.get(unassignedTopic), newAssignment));
// Step 3: We combine current assignment and new assignment.
Map<String, Set<TopicIdPartition>> finalAssignment = newHashMap(numGroupMembers);
Map<String, Set<TopicIdPartition>> finalAssignment = AssignorHelpers.newHashMap(numGroupMembers);
newAssignment.forEach((targetPartition, members) -> members.forEach(member ->
finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition)));
@ -260,10 +187,11 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
/**
* This function updates assignment by hashing the member IDs of the members and maps the partitions assigned to the
* members based on the hash, one partition per member. This gives approximately even balance.
* @param memberIds The member ids to which the topic partitions need to be assigned.
* @param partitionsToAssign The subscribed topic partitions which needs assignment.
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
* method can be called multiple times for heterogeneous assignment.
*
* @param memberIds The member ids to which the topic partitions need to be assigned.
* @param partitionsToAssign The subscribed topic partitions which needs assignment.
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
* method can be called multiple times for heterogeneous assignment.
*/
// Visible for testing
void memberHashAssignment(
@ -282,10 +210,11 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
/**
* This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment.
* @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty.
* @param partitionsToAssign The subscribed topic partitions which needs assignment.
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
* method can be called multiple times for heterogeneous assignment.
*
* @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty.
* @param partitionsToAssign The subscribed topic partitions which needs assignment.
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
* method can be called multiple times for heterogeneous assignment.
*/
// Visible for testing
void roundRobinAssignment(
@ -305,73 +234,6 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
}
}
/**
* This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment.
* @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty.
* @param partitionsToAssign The subscribed topic partitions which need assignment.
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
* method can be called multiple times for heterogeneous assignment.
* @param desiredAssignmentCount The number of partitions which can be assigned to each member to give even balance.
* Note that this number can be exceeded by one to allow for situations
* in which we have hashing collisions.
*/
void roundRobinAssignmentWithCount(
Collection<String> memberIds,
List<TopicIdPartition> partitionsToAssign,
Map<String, Set<TopicIdPartition>> assignment,
int desiredAssignmentCount
) {
Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds);
// We iterate through the target partitions which are not in the assignment and assign a memberId to them.
// In case we run out of members (memberIds < partitionsToAssign), we again start from the starting index of memberIds.
Iterator<String> memberIdIterator = memberIdsCopy.iterator();
ListIterator<TopicIdPartition> partitionListIterator = partitionsToAssign.listIterator();
while (partitionListIterator.hasNext()) {
TopicIdPartition partition = partitionListIterator.next();
if (!memberIdIterator.hasNext()) {
memberIdIterator = memberIdsCopy.iterator();
if (memberIdsCopy.isEmpty()) {
// This should never happen, but guarding against an infinite loop
throw new PartitionAssignorException("Inconsistent number of member IDs");
}
}
String memberId = memberIdIterator.next();
Set<TopicIdPartition> memberPartitions = assignment.computeIfAbsent(memberId, k -> new HashSet<>());
// We are prepared to add one more partition, even if the desired assignment count is already reached.
if (memberPartitions.size() <= desiredAssignmentCount) {
memberPartitions.add(partition);
} else {
memberIdIterator.remove();
partitionListIterator.previous();
}
}
}
private List<TopicIdPartition> computeTargetPartitions(
GroupSpec groupSpec,
Set<Uuid> subscribedTopicIds,
SubscribedTopicDescriber subscribedTopicDescriber
) {
List<TopicIdPartition> targetPartitions = new ArrayList<>();
subscribedTopicIds.forEach(topicId -> {
int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
if (numPartitions == -1) {
throw new PartitionAssignorException(
"Members are subscribed to topic " + topicId
+ " which doesn't exist in the topic metadata."
);
}
for (int partition = 0; partition < numPartitions; partition++) {
if (groupSpec.isPartitionAssignable(topicId, partition)) {
targetPartitions.add(new TopicIdPartition(topicId, partition));
}
}
});
return targetPartitions;
}
private GroupAssignment groupAssignment(
Map<String, Set<TopicIdPartition>> assignmentByMember,
Collection<String> allGroupMembers
@ -390,8 +252,4 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
return new GroupAssignment(members);
}
private static <K, V> HashMap<K, V> newHashMap(int numMappings) {
return new HashMap<>((int) (((numMappings + 1) / 0.75f) + 1));
}
}

View File

@ -0,0 +1,385 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.server.common.TopicIdPartition;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* The homogeneous simple assignment builder is used to generate the target assignment for a share group with
* all its members subscribed to the same set of topics.
* <p>
* Assignments are done according to the following principles:
* <ol>
* <li>Balance: Ensure partitions are distributed equally among all members.
* The difference in assignments sizes between any two members
* should not exceed one partition.</li>
* <li>Stickiness: Minimize partition movements among members by retaining
* as much of the existing assignment as possible.</li>
* </ol>
* <p>
* Balance is prioritized above stickiness.
*/
public class SimpleHomogeneousAssignmentBuilder {
/**
* The list of all the topic Ids that the share group is subscribed to.
*/
private final Set<Uuid> subscribedTopicIds;
/**
* The list of members in the consumer group.
*/
private final List<String> memberIds;
/**
* Maps member ids to their indices in the memberIds list.
*/
private final Map<String, Integer> memberIndices;
/**
* The list of all the topic-partitions assignable for the share group.
*/
private final List<TopicIdPartition> targetPartitions;
/**
* The number of members in the share group.
*/
private final int numGroupMembers;
/**
* The desired sharing for each target partition.
* For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
* That can be expressed as: Math.ceil(numTargetPartitions / (double) numGroupMembers)
*/
private final int desiredSharing;
/**
* The desired number of assignments for each share group member.
* <p>
* Members are stored as integer indices into the memberIds array.
*/
private final int[] desiredAssignmentCount;
/**
* The share group assignment from the group metadata specification at the start of the assignment operation.
* <p>
* Members are stored as integer indices into the memberIds array.
*/
private final Map<Integer, Map<Uuid, Set<Integer>>> oldGroupAssignment;
/**
* The share group assignment calculated iteratively by the assignment operation. Entries in this map override those
* in the old group assignment map.
* <p>
* Members are stored as integer indices into the memberIds array.
*/
private final Map<Integer, Map<Uuid, Set<Integer>>> newGroupAssignment;
/**
* The final assignment keyed by topic-partition mapping to member.
* <p>
* Members are stored as integer indices into the memberIds array.
*/
private final Map<TopicIdPartition, Set<Integer>> finalAssignmentByPartition;
/**
* The final assignment keyed by member ID mapping to topic-partitions.
* <p>
* Members are stored as integer indices into the memberIds array.
*/
private final Map<Integer, Set<TopicIdPartition>> finalAssignmentByMember;
/**
* The set of members which have too few assigned partitions.
* <p>
* Members are stored as integer indices into the memberIds array.
*/
private final Set<Integer> unfilledMembers;
/**
* The set of members which have too many assigned partitions.
* <p>
* Members are stored as integer indices into the memberIds array.
*/
private final Set<Integer> overfilledMembers;
public SimpleHomogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.subscribedTopicIds = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()).subscribedTopicIds();
// Number the members 0 to M - 1.
this.numGroupMembers = groupSpec.memberIds().size();
this.memberIds = new ArrayList<>(groupSpec.memberIds());
this.memberIndices = AssignorHelpers.newHashMap(numGroupMembers);
for (int memberIndex = 0; memberIndex < numGroupMembers; memberIndex++) {
memberIndices.put(memberIds.get(memberIndex), memberIndex);
}
this.targetPartitions = AssignorHelpers.computeTargetPartitions(groupSpec, subscribedTopicIds, subscribedTopicDescriber);
int numTargetPartitions = targetPartitions.size();
if (numTargetPartitions == 0) {
this.desiredSharing = 0;
} else {
this.desiredSharing = (numGroupMembers + numTargetPartitions - 1) / numTargetPartitions;
}
this.desiredAssignmentCount = new int[numGroupMembers];
this.oldGroupAssignment = AssignorHelpers.newHashMap(numGroupMembers);
this.newGroupAssignment = AssignorHelpers.newHashMap(numGroupMembers);
this.finalAssignmentByPartition = AssignorHelpers.newHashMap(numTargetPartitions);
this.finalAssignmentByMember = AssignorHelpers.newHashMap(numGroupMembers);
this.unfilledMembers = AssignorHelpers.newHashSet(numGroupMembers);
this.overfilledMembers = AssignorHelpers.newHashSet(numGroupMembers);
// Extract the old group assignment from the group metadata specification.
groupSpec.memberIds().forEach(memberId -> {
int memberIndex = memberIndices.get(memberId);
oldGroupAssignment.put(memberIndex, groupSpec.memberAssignment(memberId).partitions());
});
// Calculate the desired number of assignments for each member.
// The precise desired assignment count per target partition. This can be a fractional number.
// We would expect (numGroupMembers / numTargetPartitions) assignments per partition, rounded upwards.
// Using integer arithmetic: (numGroupMembers + numTargetPartitions - 1) / numTargetPartitions
double preciseDesiredAssignmentCount = desiredSharing * numTargetPartitions / (double) numGroupMembers;
for (int memberIndex = 0; memberIndex < numGroupMembers; memberIndex++) {
desiredAssignmentCount[memberIndex] =
(int) Math.ceil(preciseDesiredAssignmentCount * (double) (memberIndex + 1)) -
(int) Math.ceil(preciseDesiredAssignmentCount * (double) memberIndex);
}
}
/**
* Here's the step-by-step breakdown of the assignment process:
* <ol>
* <li>Revoke partitions from the existing assignment that are no longer part of each member's subscriptions.</li>
* <li>Revoke partitions from members which have too many partitions.</li>
* <li>Revoke any partitions which are shared more than desired.</li>
* <li>Assign any partitions which have insufficient members assigned.</li>
* </ol>
*/
public GroupAssignment build() {
if (subscribedTopicIds.isEmpty()) {
return new GroupAssignment(Map.of());
}
// The order of steps here is not that significant, but assignRemainingPartitions must go last.
revokeUnassignablePartitions();
revokeOverfilledMembers();
revokeOversharedPartitions();
// Add in any partitions which are currently not in the assignment.
targetPartitions.forEach(topicPartition -> finalAssignmentByPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()));
assignRemainingPartitions();
// Combine the old and the new group assignments to give the result.
Map<String, MemberAssignment> targetAssignment = AssignorHelpers.newHashMap(numGroupMembers);
for (int memberIndex = 0; memberIndex < numGroupMembers; memberIndex++) {
Map<Uuid, Set<Integer>> memberAssignment = newGroupAssignment.get(memberIndex);
if (memberAssignment == null) {
targetAssignment.put(memberIds.get(memberIndex), new MemberAssignmentImpl(oldGroupAssignment.get(memberIndex)));
} else {
targetAssignment.put(memberIds.get(memberIndex), new MemberAssignmentImpl(memberAssignment));
}
}
return new GroupAssignment(targetAssignment);
}
/**
* Examine the members from the current assignment, making sure that no member has too many assigned partitions.
* When looking at the current assignment, we need to only consider the topics in the current assignment that are
* also being subscribed in the new assignment.
*/
private void revokeUnassignablePartitions() {
for (Map.Entry<Integer, Map<Uuid, Set<Integer>>> entry : oldGroupAssignment.entrySet()) {
Integer memberIndex = entry.getKey();
Map<Uuid, Set<Integer>> oldMemberAssignment = entry.getValue();
Map<Uuid, Set<Integer>> newMemberAssignment = null;
int memberAssignedPartitions = 0;
int desiredAssignmentCountForMember = desiredAssignmentCount[memberIndex];
for (Map.Entry<Uuid, Set<Integer>> oldMemberPartitions : oldMemberAssignment.entrySet()) {
Uuid topicId = oldMemberPartitions.getKey();
Set<Integer> assignedPartitions = oldMemberPartitions.getValue();
if (subscribedTopicIds.contains(topicId)) {
for (int partition : assignedPartitions) {
TopicIdPartition topicPartition = new TopicIdPartition(topicId, partition);
memberAssignedPartitions++;
finalAssignmentByPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(memberIndex);
finalAssignmentByMember.computeIfAbsent(memberIndex, k -> new HashSet<>()).add(topicPartition);
if (memberAssignedPartitions >= desiredAssignmentCountForMember) {
if (newMemberAssignment == null) {
// If the new assignment is null, we create a deep copy of the
// original assignment so that we can alter it.
newMemberAssignment = AssignorHelpers.deepCopyAssignment(oldMemberAssignment);
}
}
}
} else {
if (newMemberAssignment == null) {
// If the new member assignment is null, we create a deep copy of the
// original assignment so we can alter it.
newMemberAssignment = AssignorHelpers.deepCopyAssignment(oldMemberAssignment);
}
// Remove the entire topic.
newMemberAssignment.remove(topicId);
}
}
if (newMemberAssignment != null) {
newGroupAssignment.put(memberIndex, newMemberAssignment);
}
if (memberAssignedPartitions < desiredAssignmentCountForMember) {
unfilledMembers.add(memberIndex);
} else if (memberAssignedPartitions > desiredAssignmentCountForMember) {
overfilledMembers.add(memberIndex);
}
}
}
/**
* Revoke partitions from members which are overfilled.
*/
private void revokeOverfilledMembers() {
if (overfilledMembers.isEmpty())
return;
overfilledMembers.forEach(memberIndex -> {
int memberDesiredAssignmentCount = desiredAssignmentCount[memberIndex];
Set<TopicIdPartition> memberFinalAssignment = finalAssignmentByMember.get(memberIndex);
if (memberFinalAssignment.size() > memberDesiredAssignmentCount) {
Iterator<TopicIdPartition> iterator = memberFinalAssignment.iterator();
while (iterator.hasNext()) {
TopicIdPartition topicPartition = iterator.next();
newGroupAssignment.get(memberIndex).get(topicPartition.topicId()).remove(topicPartition.partitionId());
finalAssignmentByPartition.get(topicPartition).remove(memberIndex);
iterator.remove();
if (memberFinalAssignment.size() == memberDesiredAssignmentCount) {
break;
}
}
}
});
}
/**
* Revoke any over-shared partitions.
*/
private void revokeOversharedPartitions() {
finalAssignmentByPartition.forEach((topicPartition, assignedMembers) -> {
int assignedMemberCount = assignedMembers.size();
if (assignedMemberCount > desiredSharing) {
Iterator<Integer> assignedMemberIterator = assignedMembers.iterator();
while (assignedMemberIterator.hasNext()) {
Integer memberIndex = assignedMemberIterator.next();
Map<Uuid, Set<Integer>> newMemberAssignment = newGroupAssignment.get(memberIndex);
if (newMemberAssignment == null) {
newMemberAssignment = AssignorHelpers.deepCopyAssignment(oldGroupAssignment.get(memberIndex));
newGroupAssignment.put(memberIndex, newMemberAssignment);
}
Set<Integer> partitions = newMemberAssignment.get(topicPartition.topicId());
if (partitions != null) {
if (partitions.remove(topicPartition.partitionId())) {
assignedMemberCount--;
assignedMemberIterator.remove();
finalAssignmentByMember.get(memberIndex).remove(topicPartition);
unfilledMembers.add(memberIndex);
}
}
if (assignedMemberCount <= desiredSharing) {
break;
}
}
}
});
}
/**
* Assign partitions to unfilled members. It repeatedly iterates through the unfilled members while running
* once thrown the set of partitions. When a partition is found that has insufficient sharing, it attempts to assign
* to one of the partitions.
* <p>
* There is one tricky cases here and that's where a partition wants another assignment, but none of the unfilled
* members are able to take it (because they already have that partition). In this situation, we just accept that
* no additional assignments for this partition could be made and carry on. In theory, a different shuffling of the
* partitions would be able to achieve better balance, but it's harmless tolerating a slight imbalance in this case.
* <p>
* Note that finalAssignmentByMember is not maintained by this method which is expected to be the final step in the
* computation.
*/
private void assignRemainingPartitions() {
if (unfilledMembers.isEmpty())
return;
Iterator<Integer> memberIterator = unfilledMembers.iterator();
boolean partitionAssignedForThisIterator = false;
for (Map.Entry<TopicIdPartition, Set<Integer>> partitionAssignment : finalAssignmentByPartition.entrySet()) {
TopicIdPartition topicPartition = partitionAssignment.getKey();
Set<Integer> membersAssigned = partitionAssignment.getValue();
if (membersAssigned.size() < desiredSharing) {
int assignmentsToMake = desiredSharing - membersAssigned.size();
while (assignmentsToMake > 0) {
if (!memberIterator.hasNext()) {
if (!partitionAssignedForThisIterator) {
break;
}
memberIterator = unfilledMembers.iterator();
partitionAssignedForThisIterator = false;
}
int memberIndex = memberIterator.next();
if (!membersAssigned.contains(memberIndex)) {
Map<Uuid, Set<Integer>> newMemberAssignment = newGroupAssignment.get(memberIndex);
if (newMemberAssignment == null) {
newMemberAssignment = AssignorHelpers.deepCopyAssignment(oldGroupAssignment.get(memberIndex));
newGroupAssignment.put(memberIndex, newMemberAssignment);
}
newMemberAssignment.computeIfAbsent(topicPartition.topicId(), k -> new HashSet<>()).add(topicPartition.partitionId());
finalAssignmentByMember.computeIfAbsent(memberIndex, k -> new HashSet<>()).add(topicPartition);
assignmentsToMake--;
partitionAssignedForThisIterator = true;
if (finalAssignmentByMember.get(memberIndex).size() >= desiredAssignmentCount[memberIndex]) {
memberIterator.remove();
}
}
}
}
if (unfilledMembers.isEmpty()) {
break;
}
}
}
}

View File

@ -202,23 +202,7 @@ public class SimpleAssignorTest {
subscribedTopicMetadata
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
// Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
// Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by round-robin assignment.
// Step 3 -> no new assignment gets added by current assignment since it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 2),
mkTopicAssignment(TOPIC_3_UUID, 1)
));
expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 1),
mkTopicAssignment(TOPIC_3_UUID, 0)
));
// T1: 3 partitions + T3: 2 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
@Test
@ -268,23 +252,7 @@ public class SimpleAssignorTest {
subscribedTopicMetadata
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
// Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
// Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by round-robin assignment.
// Step 3 -> no new assignment gets added by current assignment since it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 2),
mkTopicAssignment(TOPIC_3_UUID, 1)
));
expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 1),
mkTopicAssignment(TOPIC_3_UUID, 0)
));
// T1: 3 partitions + T3: 2 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
@Test
@ -332,17 +300,7 @@ public class SimpleAssignorTest {
subscribedTopicMetadata
);
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 1, 2)
));
expectedAssignment.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0)
));
// T1: 3 partitions + T3(non-assignable): 2 partitions = 3 partitions
assertEveryPartitionGetsAssignment(3, computedAssignment);
assertAssignment(expectedAssignment, computedAssignment);
}
@Test
@ -675,182 +633,121 @@ public class SimpleAssignorTest {
}
@Test
public void testRoundRobinAssignmentWithCount() {
String member1 = "member1";
String member2 = "member2";
List<String> members = List.of(member1, member2);
TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
List<TopicIdPartition> unassignedPartitions = List.of(partition2, partition3, partition4);
public void testIncrementalAssignmentIncreasingMembersHomogeneous() {
final int numPartitions = 24;
final int numMembers = 101;
Map<String, Set<TopicIdPartition>> assignment = new HashMap<>();
assignment.put(member1, new HashSet<>(Set.of(partition1)));
assignment.put(member2, new HashSet<>(Set.of(partition1)));
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, numPartitions)
.build();
assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2);
Map<String, Set<TopicIdPartition>> expectedAssignment = Map.of(
member1, Set.of(partition1, partition2, partition4),
member2, Set.of(partition1, partition3)
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
assertFinalAssignment(expectedAssignment, assignment);
Set<Uuid> topicsSubscription = new LinkedHashSet<>();
topicsSubscription.add(TOPIC_1_UUID);
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
SimpleAssignor assignor = new SimpleAssignor();
// Increase the number of members one a time, checking that the partitions are assigned as expected
for (int member = 0; member < numMembers; member++) {
String newMemberId = "M" + member;
members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription,
Assignment.EMPTY
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
new HashMap<>()
);
GroupAssignment computedAssignment = assignor.assign(groupSpec, subscribedTopicMetadata);
assertEveryPartitionGetsAssignment(numPartitions, computedAssignment);
computedAssignment.members().forEach((memberId, partitions) -> members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription,
new Assignment(partitions.partitions())
)));
}
}
@Test
public void testRoundRobinAssignmentWithCountTooManyPartitions() {
String member1 = "member1";
String member2 = "member2";
List<String> members = List.of(member1, member2);
TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
TopicIdPartition partition5 = new TopicIdPartition(TOPIC_4_UUID, 1);
TopicIdPartition partition6 = new TopicIdPartition(TOPIC_4_UUID, 2);
List<TopicIdPartition> unassignedPartitions = List.of(partition2, partition3, partition4, partition5, partition6);
public void testIncrementalAssignmentDecreasingMembersHomogeneous() {
final int numPartitions = 24;
final int numMembers = 101;
Map<String, Set<TopicIdPartition>> assignment = new HashMap<>();
assignment.put(member1, new HashSet<>(Set.of(partition1)));
assignment.put(member2, new HashSet<>(Set.of(partition1)));
assertThrows(PartitionAssignorException.class,
() -> assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, assignment, 2));
}
@Test
public void testAssignWithCurrentAssignmentHomogeneous() {
// Current assignment setup - Two members A, B subscribing to T1 and T2.
MetadataImage metadataImage1 = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 2)
MetadataImage metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, numPartitions)
.build();
Map<String, MemberSubscriptionAndAssignmentImpl> members1 = new HashMap<>();
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
metadataImage
);
Set<Uuid> topicsSubscription1 = new LinkedHashSet<>();
topicsSubscription1.add(TOPIC_1_UUID);
topicsSubscription1.add(TOPIC_2_UUID);
Set<Uuid> topicsSubscription = new LinkedHashSet<>();
topicsSubscription.add(TOPIC_1_UUID);
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription1,
Assignment.EMPTY
));
SimpleAssignor assignor = new SimpleAssignor();
members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription1,
Assignment.EMPTY
));
for (int member = 0; member < numMembers; member++) {
String newMemberId = "M" + member;
members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription,
Assignment.EMPTY
));
}
GroupSpec groupSpec1 = new GroupSpecImpl(
members1,
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new SubscribedTopicDescriberImpl(
metadataImage1
new HashMap<>()
);
GroupAssignment computedAssignment1 = assignor.assign(
groupSpec1,
subscribedTopicMetadata1
);
GroupAssignment computedAssignment = assignor.assign(groupSpec, subscribedTopicMetadata);
assertEveryPartitionGetsAssignment(numPartitions, computedAssignment);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
// Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
// Step 2 -> T1:2, T2:1 -> MEMBER_A and T2:0 -> MEMBER_B by round-robin assignment.
// Step 3 -> no new assignment gets added by current assignment since it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new HashMap<>();
expectedAssignment1.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 2),
mkTopicAssignment(TOPIC_2_UUID, 1)
));
expectedAssignment1.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 1),
mkTopicAssignment(TOPIC_2_UUID, 0)
));
for (int member = 0; member < numMembers; member++) {
String newMemberId = "M" + member;
members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription,
new Assignment(computedAssignment.members().get(newMemberId).partitions()))
);
}
// T1: 3 partitions + T2: 2 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment1);
assertAssignment(expectedAssignment1, computedAssignment1);
// Decrease the number of members one a time, checking that the partitions are assigned as expected
for (int member = numMembers - 1; member > 0; member--) {
String newMemberId = "M" + member;
members.remove(newMemberId);
// New assignment setup - Three members A, B, C subscribing to T2 and T3.
MetadataImage metadataImage2 = new MetadataImageBuilder()
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 2)
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 3)
.build();
groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
new HashMap<>()
);
Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new HashMap<>();
computedAssignment = assignor.assign(groupSpec, subscribedTopicMetadata);
assertEveryPartitionGetsAssignment(numPartitions, computedAssignment);
Set<Uuid> topicsSubscription2 = new LinkedHashSet<>();
topicsSubscription2.add(TOPIC_2_UUID);
topicsSubscription2.add(TOPIC_3_UUID);
members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription2,
// Utilizing the assignment from current assignment
new Assignment(mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 2),
mkTopicAssignment(TOPIC_2_UUID, 1)))
));
members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription2,
new Assignment(mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 1),
mkTopicAssignment(TOPIC_2_UUID, 0)))
));
members2.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription2,
Assignment.EMPTY
));
GroupSpec groupSpec2 = new GroupSpecImpl(
members2,
HOMOGENEOUS,
Map.of()
);
SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new SubscribedTopicDescriberImpl(
metadataImage2
);
GroupAssignment computedAssignment2 = assignor.assign(
groupSpec2,
subscribedTopicMetadata2
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67.
// Step 1 -> T2:0 -> MEMBER_A, T2:1 -> MEMBER_B, T3:0 -> MEMBER_C by hash assignment
// Step 2 -> T3:1 -> MEMBER_A, T3:2 -> MEMBER_B by round-robin assignment
// Step 3 -> no new addition by current assignment since T2:0 and T2:1 were already a part of new assignment.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new HashMap<>();
expectedAssignment2.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_2_UUID, 0),
mkTopicAssignment(TOPIC_3_UUID, 1)
));
expectedAssignment2.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_2_UUID, 1),
mkTopicAssignment(TOPIC_3_UUID, 2)
));
expectedAssignment2.put(MEMBER_C, mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0)
));
// T2: 2 partitions + T3: 3 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment2);
assertAssignment(expectedAssignment2, computedAssignment2);
computedAssignment.members().forEach((memberId, partitions) -> members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
topicsSubscription,
new Assignment(partitions.partitions())
)));
}
}
@Test
@ -905,25 +802,7 @@ public class SimpleAssignorTest {
subscribedTopicMetadata1
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67.
// Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by hash assignment.
// Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, T2:1 -> member_C by round-robin assignment.
// Step 3 -> no new assignment gets added by current assignment since it is empty.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new HashMap<>();
expectedAssignment1.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 2)
));
expectedAssignment1.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
));
expectedAssignment1.put(MEMBER_C, mkAssignment(
mkTopicAssignment(TOPIC_2_UUID, 1, 2)
));
// T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 partitions
assertEveryPartitionGetsAssignment(8, computedAssignment1);
assertAssignment(expectedAssignment1, computedAssignment1);
// New assignment setup - 2 members A - {T1, T2, T3}, B - {T3, T4}.
@ -977,23 +856,7 @@ public class SimpleAssignorTest {
subscribedTopicMetadata2
);
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
// Step 1 -> T1:1 -> member_A, T3:0 -> member_B by hash assignment.
// Step 2 -> T2:1 -> member_A, T4:0 -> member_B by round-robin assignment.
// Step 3 -> T1:0, T1:2, T2:0 -> member_A, T3:1 -> member_B by current assignment.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new HashMap<>();
expectedAssignment2.put(MEMBER_A, mkAssignment(
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2)
));
expectedAssignment2.put(MEMBER_B, mkAssignment(
mkTopicAssignment(TOPIC_3_UUID, 0, 1),
mkTopicAssignment(TOPIC_4_UUID, 0)
));
// T1: 3 partitions + T2: 3 partitions + T3: 2 partitions + T4: 1 partition = 9 partitions
assertEveryPartitionGetsAssignment(9, computedAssignment2);
assertAssignment(expectedAssignment2, computedAssignment2);
}
private void assertAssignment(
@ -1019,18 +882,6 @@ public class SimpleAssignorTest {
});
}
private void assertFinalAssignment(
Map<String, Set<TopicIdPartition>> expectedAssignment,
Map<String, Set<TopicIdPartition>> computedAssignment
) {
assertEquals(expectedAssignment.size(), computedAssignment.size());
expectedAssignment.forEach((memberId, partitions) -> {
Set<TopicIdPartition> computedPartitions = computedAssignment.getOrDefault(memberId, Set.of());
assertEquals(partitions.size(), computedPartitions.size());
partitions.forEach(member -> assertTrue(computedPartitions.contains(member)));
});
}
private void assertEveryPartitionGetsAssignment(
int expectedPartitions,
GroupAssignment computedGroupAssignment

View File

@ -95,7 +95,7 @@ public class ShareGroupAssignorBenchmark {
@Param({"1", "10", "100"})
private int partitionCount;
@Param({"10", "100"})
@Param({"1", "10", "100"})
private int topicCount;
@Param({"HOMOGENEOUS", "HETEROGENEOUS"})