mirror of https://github.com/apache/kafka.git
KAFKA-19363: Finalize heterogeneous simple share assignor (#20074)
CI / build (push) Waiting to run
Details
CI / build (push) Waiting to run
Details
Finalise the share group SimpleAssignor for heterogeneous subscriptions. The assignor code is much more accurate about the number of partitions assigned to each member, and the number of members assigned for each partition. It eliminates the idea of hash-based assignment because that has been shown to the unhelpful. The revised code is very much more effective at assigning evenly as the number of members grows and shrinks over time. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
parent
4e31e270ba
commit
860853dba2
|
@ -17,16 +17,10 @@
|
|||
package org.apache.kafka.coordinator.group.assignor;
|
||||
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
|
||||
import org.apache.kafka.server.common.TopicIdPartition;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -58,38 +52,6 @@ public final class AssignorHelpers {
|
|||
return copy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the list of target partitions which can be assigned to members. This list includes all partitions
|
||||
* for the subscribed topic IDs, with the additional check that they must be assignable.
|
||||
* @param groupSpec The assignment spec which includes member metadata.
|
||||
* @param subscribedTopicIds The set of subscribed topic IDs.
|
||||
* @param subscribedTopicDescriber The topic and partition metadata describer.
|
||||
* @return The list of target partitions.
|
||||
*/
|
||||
static List<TopicIdPartition> computeTargetPartitions(
|
||||
GroupSpec groupSpec,
|
||||
Set<Uuid> subscribedTopicIds,
|
||||
SubscribedTopicDescriber subscribedTopicDescriber
|
||||
) {
|
||||
List<TopicIdPartition> targetPartitions = new ArrayList<>();
|
||||
subscribedTopicIds.forEach(topicId -> {
|
||||
int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
|
||||
if (numPartitions == -1) {
|
||||
throw new PartitionAssignorException(
|
||||
"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."
|
||||
);
|
||||
}
|
||||
|
||||
for (int partition = 0; partition < numPartitions; partition++) {
|
||||
if (groupSpec.isPartitionAssignable(topicId, partition)) {
|
||||
targetPartitions.add(new TopicIdPartition(topicId, partition));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return targetPartitions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a HashNap with a known capacity. This is equivalent to HashMap.newHashMap which is introduced in Java 19.
|
||||
* @param numMappings The expected number of mappings.
|
||||
|
|
|
@ -16,29 +16,16 @@
|
|||
*/
|
||||
package org.apache.kafka.coordinator.group.assignor;
|
||||
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
|
||||
import org.apache.kafka.server.common.TopicIdPartition;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
|
||||
|
||||
|
@ -78,178 +65,16 @@ public class SimpleAssignor implements ShareGroupPartitionAssignor {
|
|||
*/
|
||||
@Override
|
||||
public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException {
|
||||
if (groupSpec.memberIds().isEmpty())
|
||||
if (groupSpec.memberIds().isEmpty()) {
|
||||
return new GroupAssignment(Map.of());
|
||||
}
|
||||
|
||||
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
|
||||
log.debug("Detected that all members are subscribed to the same set of topics, invoking the homogeneous assignment algorithm");
|
||||
return new SimpleHomogeneousAssignmentBuilder(groupSpec, subscribedTopicDescriber).build();
|
||||
} else {
|
||||
log.debug("Detected that the members are subscribed to different sets of topics, invoking the heterogeneous assignment algorithm");
|
||||
return assignHeterogeneous(groupSpec, subscribedTopicDescriber);
|
||||
return new SimpleHeterogeneousAssignmentBuilder(groupSpec, subscribedTopicDescriber).build();
|
||||
}
|
||||
}
|
||||
|
||||
private GroupAssignment assignHeterogeneous(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
|
||||
Map<String, List<TopicIdPartition>> memberToPartitionsSubscription = new HashMap<>();
|
||||
for (String memberId : groupSpec.memberIds()) {
|
||||
MemberSubscription spec = groupSpec.memberSubscription(memberId);
|
||||
if (spec.subscribedTopicIds().isEmpty())
|
||||
continue;
|
||||
|
||||
// Subscribed topic partitions for the share group member.
|
||||
List<TopicIdPartition> targetPartitions = AssignorHelpers.computeTargetPartitions(groupSpec, spec.subscribedTopicIds(), subscribedTopicDescriber);
|
||||
memberToPartitionsSubscription.put(memberId, targetPartitions);
|
||||
}
|
||||
|
||||
// The current assignment from topic partition to members.
|
||||
Map<TopicIdPartition, List<String>> currentAssignment = currentAssignment(groupSpec);
|
||||
|
||||
return newAssignmentHeterogeneous(groupSpec, memberToPartitionsSubscription, currentAssignment);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current assignment by topic partitions.
|
||||
*
|
||||
* @param groupSpec The group metadata specifications.
|
||||
* @return the current assignment for subscribed topic partitions to memberIds.
|
||||
*/
|
||||
static Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec groupSpec) {
|
||||
Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
|
||||
|
||||
for (String member : groupSpec.memberIds()) {
|
||||
Map<Uuid, Set<Integer>> assignedTopicPartitions = groupSpec.memberAssignment(member).partitions();
|
||||
assignedTopicPartitions.forEach((topicId, partitions) -> partitions.forEach(
|
||||
partition -> assignment.computeIfAbsent(new TopicIdPartition(topicId, partition), k -> new ArrayList<>()).add(member)));
|
||||
}
|
||||
|
||||
return assignment;
|
||||
}
|
||||
|
||||
/**
|
||||
* This function computes the new assignment for a heterogeneous group.
|
||||
*
|
||||
* @param groupSpec The group metadata specifications.
|
||||
* @param memberToPartitionsSubscription The member to subscribed topic partitions map.
|
||||
* @param currentAssignment The current assignment for subscribed topic partitions to memberIds.
|
||||
* @return the new partition assignment for the members of the group.
|
||||
*/
|
||||
private GroupAssignment newAssignmentHeterogeneous(
|
||||
GroupSpec groupSpec,
|
||||
Map<String, List<TopicIdPartition>> memberToPartitionsSubscription,
|
||||
Map<TopicIdPartition, List<String>> currentAssignment
|
||||
) {
|
||||
int numGroupMembers = groupSpec.memberIds().size();
|
||||
|
||||
// Exhaustive set of all subscribed topic partitions.
|
||||
Set<TopicIdPartition> targetPartitions = new LinkedHashSet<>();
|
||||
memberToPartitionsSubscription.values().forEach(targetPartitions::addAll);
|
||||
|
||||
// Create a map for topic to members subscription.
|
||||
Map<Uuid, Set<String>> topicToMemberSubscription = new HashMap<>();
|
||||
memberToPartitionsSubscription.forEach((member, partitions) ->
|
||||
partitions.forEach(partition -> topicToMemberSubscription.computeIfAbsent(partition.topicId(), k -> new LinkedHashSet<>()).add(member)));
|
||||
|
||||
Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
|
||||
|
||||
// Step 1: Hash member IDs to partitions.
|
||||
memberToPartitionsSubscription.forEach((member, partitions) ->
|
||||
memberHashAssignment(List.of(member), partitions, newAssignment));
|
||||
|
||||
// Step 2: Round-robin assignment for unassigned partitions which do not have members already assigned in the current assignment.
|
||||
Set<TopicIdPartition> assignedPartitions = new LinkedHashSet<>(newAssignment.keySet());
|
||||
Map<Uuid, List<TopicIdPartition>> unassignedPartitions = new HashMap<>();
|
||||
targetPartitions.forEach(targetPartition -> {
|
||||
if (!assignedPartitions.contains(targetPartition) && !currentAssignment.containsKey(targetPartition))
|
||||
unassignedPartitions.computeIfAbsent(targetPartition.topicId(), k -> new ArrayList<>()).add(targetPartition);
|
||||
});
|
||||
|
||||
unassignedPartitions.keySet().forEach(unassignedTopic ->
|
||||
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), unassignedPartitions.get(unassignedTopic), newAssignment));
|
||||
|
||||
// Step 3: We combine current assignment and new assignment.
|
||||
Map<String, Set<TopicIdPartition>> finalAssignment = AssignorHelpers.newHashMap(numGroupMembers);
|
||||
|
||||
newAssignment.forEach((targetPartition, members) -> members.forEach(member ->
|
||||
finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(targetPartition)));
|
||||
|
||||
// When combining current assignment, we need to only consider the member topic subscription in current assignment
|
||||
// which is being subscribed in the new assignment as well.
|
||||
currentAssignment.forEach((topicIdPartition, members) -> members.forEach(member -> {
|
||||
if (topicToMemberSubscription.getOrDefault(topicIdPartition.topicId(), Set.of()).contains(member)
|
||||
&& !newAssignment.containsKey(topicIdPartition))
|
||||
finalAssignment.computeIfAbsent(member, k -> new HashSet<>()).add(topicIdPartition);
|
||||
}));
|
||||
|
||||
return groupAssignment(finalAssignment, groupSpec.memberIds());
|
||||
}
|
||||
|
||||
/**
|
||||
* This function updates assignment by hashing the member IDs of the members and maps the partitions assigned to the
|
||||
* members based on the hash, one partition per member. This gives approximately even balance.
|
||||
*
|
||||
* @param memberIds The member ids to which the topic partitions need to be assigned.
|
||||
* @param partitionsToAssign The subscribed topic partitions which needs assignment.
|
||||
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
|
||||
* method can be called multiple times for heterogeneous assignment.
|
||||
*/
|
||||
// Visible for testing
|
||||
void memberHashAssignment(
|
||||
Collection<String> memberIds,
|
||||
List<TopicIdPartition> partitionsToAssign,
|
||||
Map<TopicIdPartition, List<String>> assignment
|
||||
) {
|
||||
if (!partitionsToAssign.isEmpty()) {
|
||||
for (String memberId : memberIds) {
|
||||
int topicPartitionIndex = Math.abs(memberId.hashCode() % partitionsToAssign.size());
|
||||
TopicIdPartition topicPartition = partitionsToAssign.get(topicPartitionIndex);
|
||||
assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This functions assigns topic partitions to members by a round-robin approach and updates the existing assignment.
|
||||
*
|
||||
* @param memberIds The member ids to which the topic partitions need to be assigned, should be non-empty.
|
||||
* @param partitionsToAssign The subscribed topic partitions which needs assignment.
|
||||
* @param assignment The existing assignment by topic partition. We need to pass it as a parameter because this
|
||||
* method can be called multiple times for heterogeneous assignment.
|
||||
*/
|
||||
// Visible for testing
|
||||
void roundRobinAssignment(
|
||||
Collection<String> memberIds,
|
||||
List<TopicIdPartition> partitionsToAssign,
|
||||
Map<TopicIdPartition, List<String>> assignment
|
||||
) {
|
||||
// We iterate through the target partitions and assign a memberId to them. In case we run out of members (members < targetPartitions),
|
||||
// we again start from the starting index of memberIds.
|
||||
Iterator<String> memberIdIterator = memberIds.iterator();
|
||||
for (TopicIdPartition topicPartition : partitionsToAssign) {
|
||||
if (!memberIdIterator.hasNext()) {
|
||||
memberIdIterator = memberIds.iterator();
|
||||
}
|
||||
String memberId = memberIdIterator.next();
|
||||
assignment.computeIfAbsent(topicPartition, k -> new ArrayList<>()).add(memberId);
|
||||
}
|
||||
}
|
||||
|
||||
private GroupAssignment groupAssignment(
|
||||
Map<String, Set<TopicIdPartition>> assignmentByMember,
|
||||
Collection<String> allGroupMembers
|
||||
) {
|
||||
Map<String, MemberAssignment> members = new HashMap<>();
|
||||
for (Map.Entry<String, Set<TopicIdPartition>> entry : assignmentByMember.entrySet()) {
|
||||
Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
|
||||
entry.getValue().forEach(targetPartition ->
|
||||
targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new HashSet<>()).add(targetPartition.partitionId()));
|
||||
members.put(entry.getKey(), new MemberAssignmentImpl(targetPartitions));
|
||||
}
|
||||
allGroupMembers.forEach(member -> {
|
||||
if (!members.containsKey(member))
|
||||
members.put(member, new MemberAssignmentImpl(new HashMap<>()));
|
||||
});
|
||||
|
||||
return new GroupAssignment(members);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,461 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.kafka.coordinator.group.assignor;
|
||||
|
||||
import org.apache.kafka.common.Uuid;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.MemberSubscription;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
|
||||
import org.apache.kafka.server.common.TopicIdPartition;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The heterogeneous simple assignment builder is used to generate the target assignment for a share group with
|
||||
* at least one of its members subscribed to a different set of topics.
|
||||
* <p>
|
||||
* Assignments are done according to the following principles:
|
||||
* <ol>
|
||||
* <li>Balance: Ensure partitions are distributed equally among all members.
|
||||
* The difference in assignments sizes between any two members
|
||||
* should not exceed one partition.</li>
|
||||
* <li>Stickiness: Minimize partition movements among members by retaining
|
||||
* as much of the existing assignment as possible.</li>
|
||||
* </ol>
|
||||
* <p>
|
||||
* Balance is prioritized above stickiness.
|
||||
* <p>
|
||||
* Note that balance is computed for each topic individually and then the assignments are combined. So, if M1
|
||||
* is subscribed to T1, M2 is subscribed to T2, and M3 is subscribed to T1 and T2, M3 will get assigned half
|
||||
* of the partitions for T1 and half of the partitions for T2.
|
||||
*/
|
||||
public class SimpleHeterogeneousAssignmentBuilder {
|
||||
|
||||
/**
|
||||
* The list of all the topic Ids that the share group is subscribed to.
|
||||
*/
|
||||
private final Set<Uuid> subscribedTopicIds;
|
||||
|
||||
/**
|
||||
* The list of members in the consumer group.
|
||||
*/
|
||||
private final List<String> memberIds;
|
||||
|
||||
/**
|
||||
* Maps member ids to their indices in the memberIds list.
|
||||
*/
|
||||
private final Map<String, Integer> memberIndices;
|
||||
|
||||
/**
|
||||
* Subscribed members by topic.
|
||||
*/
|
||||
private final Map<Uuid, List<Integer>> subscribedMembersByTopic;
|
||||
|
||||
/**
|
||||
* The list of all the topic-partitions assignable for the share group, grouped by topic.
|
||||
*/
|
||||
private final Map<Uuid, List<TopicIdPartition>> targetPartitionsByTopic;
|
||||
|
||||
/**
|
||||
* The number of members in the share group.
|
||||
*/
|
||||
private final int numGroupMembers;
|
||||
|
||||
/**
|
||||
* The share group assignment from the group metadata specification at the start of the assignment operation.
|
||||
* <p>
|
||||
* Members are stored as integer indices into the memberIds array.
|
||||
*/
|
||||
private final Map<Integer, Map<Uuid, Set<Integer>>> oldGroupAssignment;
|
||||
|
||||
/**
|
||||
* The share group assignment calculated iteratively by the assignment operation. Entries in this map override those
|
||||
* in the old group assignment map.
|
||||
* <p>
|
||||
* Members are stored as integer indices into the memberIds array.
|
||||
*/
|
||||
private final Map<Integer, Map<Uuid, Set<Integer>>> newGroupAssignment;
|
||||
|
||||
public SimpleHeterogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
|
||||
this.subscribedTopicIds = new HashSet<>();
|
||||
groupSpec.memberIds().forEach(memberId -> subscribedTopicIds.addAll(groupSpec.memberSubscription(memberId).subscribedTopicIds()));
|
||||
|
||||
// Number the members 0 to M - 1.
|
||||
this.numGroupMembers = groupSpec.memberIds().size();
|
||||
this.memberIds = new ArrayList<>(groupSpec.memberIds());
|
||||
this.memberIndices = AssignorHelpers.newHashMap(numGroupMembers);
|
||||
for (int memberIndex = 0; memberIndex < numGroupMembers; memberIndex++) {
|
||||
memberIndices.put(memberIds.get(memberIndex), memberIndex);
|
||||
}
|
||||
|
||||
this.targetPartitionsByTopic = computeTargetPartitions(groupSpec, subscribedTopicIds, subscribedTopicDescriber);
|
||||
this.subscribedMembersByTopic = computeSubscribedMembers(groupSpec, subscribedTopicIds, memberIndices);
|
||||
|
||||
this.oldGroupAssignment = AssignorHelpers.newHashMap(numGroupMembers);
|
||||
this.newGroupAssignment = AssignorHelpers.newHashMap(numGroupMembers);
|
||||
|
||||
// Extract the old group assignment from the group metadata specification.
|
||||
groupSpec.memberIds().forEach(memberId -> {
|
||||
int memberIndex = memberIndices.get(memberId);
|
||||
oldGroupAssignment.put(memberIndex, groupSpec.memberAssignment(memberId).partitions());
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Here's the step-by-step breakdown of the assignment process, performed for each subscribed topic in turn:
|
||||
* <ol>
|
||||
* <li>Revoke partitions from the existing assignment that are no longer part of each member's subscriptions.</li>
|
||||
* <li>Revoke partitions from members which have too many partitions.</li>
|
||||
* <li>Revoke any partitions which are shared more than desired.</li>
|
||||
* <li>Assign any partitions which have insufficient members assigned.</li>
|
||||
* </ol>
|
||||
*/
|
||||
public GroupAssignment build() {
|
||||
if (subscribedTopicIds.isEmpty()) {
|
||||
return new GroupAssignment(Map.of());
|
||||
}
|
||||
|
||||
// Compute the partition assignments for each topic separately.
|
||||
subscribedTopicIds.forEach(topicId -> {
|
||||
TopicAssignmentPartialBuilder topicAssignmentBuilder =
|
||||
new TopicAssignmentPartialBuilder(topicId, numGroupMembers, targetPartitionsByTopic.get(topicId), subscribedMembersByTopic.get(topicId));
|
||||
topicAssignmentBuilder.build();
|
||||
});
|
||||
|
||||
// Combine the old and the new group assignments to give the result.
|
||||
Map<String, MemberAssignment> targetAssignment = AssignorHelpers.newHashMap(numGroupMembers);
|
||||
for (int memberIndex = 0; memberIndex < numGroupMembers; memberIndex++) {
|
||||
Map<Uuid, Set<Integer>> memberAssignment = newGroupAssignment.get(memberIndex);
|
||||
if (memberAssignment == null) {
|
||||
targetAssignment.put(memberIds.get(memberIndex), new MemberAssignmentImpl(oldGroupAssignment.get(memberIndex)));
|
||||
} else {
|
||||
targetAssignment.put(memberIds.get(memberIndex), new MemberAssignmentImpl(memberAssignment));
|
||||
}
|
||||
}
|
||||
|
||||
return new GroupAssignment(targetAssignment);
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the list of target partitions which can be assigned to members. This list includes all partitions
|
||||
* for the subscribed topic IDs, with the additional check that they must be assignable.
|
||||
* @param groupSpec The assignment spec which includes member metadata.
|
||||
* @param subscribedTopicIds The set of subscribed topic IDs.
|
||||
* @param subscribedTopicDescriber The topic and partition metadata describer.
|
||||
* @return The list of target partitions, grouped by topic.
|
||||
*/
|
||||
private static Map<Uuid, List<TopicIdPartition>> computeTargetPartitions(
|
||||
GroupSpec groupSpec,
|
||||
Set<Uuid> subscribedTopicIds,
|
||||
SubscribedTopicDescriber subscribedTopicDescriber
|
||||
) {
|
||||
Map<Uuid, List<TopicIdPartition>> targetPartitionsByTopic = AssignorHelpers.newHashMap(subscribedTopicIds.size());
|
||||
subscribedTopicIds.forEach(topicId -> {
|
||||
int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
|
||||
if (numPartitions == -1) {
|
||||
throw new PartitionAssignorException(
|
||||
"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."
|
||||
);
|
||||
}
|
||||
|
||||
List<TopicIdPartition> targetPartitions = new ArrayList<>(numPartitions);
|
||||
for (int partition = 0; partition < numPartitions; partition++) {
|
||||
if (groupSpec.isPartitionAssignable(topicId, partition)) {
|
||||
targetPartitions.add(new TopicIdPartition(topicId, partition));
|
||||
}
|
||||
}
|
||||
targetPartitionsByTopic.put(topicId, targetPartitions);
|
||||
});
|
||||
|
||||
return targetPartitionsByTopic;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the list of member indices which are subscribed to each topic.
|
||||
* @param groupSpec The assignment spec which includes member metadata.
|
||||
* @param subscribedTopicIds The set of subscribed topic IDs.
|
||||
* @param memberIndices The map from member IDs to member indices.
|
||||
* @return The list of member indices, grouped by topic.
|
||||
*/
|
||||
private static Map<Uuid, List<Integer>> computeSubscribedMembers(
|
||||
GroupSpec groupSpec,
|
||||
Set<Uuid> subscribedTopicIds,
|
||||
Map<String, Integer> memberIndices
|
||||
) {
|
||||
int numMembers = memberIndices.size();
|
||||
Map<Uuid, List<Integer>> subscribedMembersByTopic = AssignorHelpers.newHashMap(subscribedTopicIds.size());
|
||||
groupSpec.memberIds().forEach(memberId -> {
|
||||
int memberIndex = memberIndices.get(memberId);
|
||||
MemberSubscription memberSubscription = groupSpec.memberSubscription(memberId);
|
||||
memberSubscription.subscribedTopicIds().forEach(topicId ->
|
||||
subscribedMembersByTopic.computeIfAbsent(topicId, k -> new ArrayList<>(numMembers)).add(memberIndex));
|
||||
});
|
||||
|
||||
return subscribedMembersByTopic;
|
||||
}
|
||||
|
||||
/**
|
||||
* A class that computes the assignment for one topic.
|
||||
*/
|
||||
private final class TopicAssignmentPartialBuilder {
|
||||
|
||||
/**
|
||||
* The topic ID.
|
||||
*/
|
||||
private final Uuid topicId;
|
||||
|
||||
/**
|
||||
* The list of assignable topic-partitions for this topic.
|
||||
*/
|
||||
private final List<TopicIdPartition> targetPartitions;
|
||||
|
||||
/**
|
||||
* The list of member indices subscribed to this topic.
|
||||
*/
|
||||
private final List<Integer> subscribedMembers;
|
||||
|
||||
/**
|
||||
* The final assignment, grouped by partition index, progressively computed during the assignment building.
|
||||
*/
|
||||
private final Map<Integer, Set<Integer>> finalAssignmentByPartition;
|
||||
|
||||
/**
|
||||
* The final assignment, grouped by member index, progressively computed during the assignment building.
|
||||
*/
|
||||
private final Map<Integer, Set<Integer>> finalAssignmentByMember;
|
||||
|
||||
/**
|
||||
* The desired sharing for each target partition.
|
||||
* For entirely balanced assignment, we would expect (numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
|
||||
* That can be expressed as: Math.ceil(numTargetPartitions / (double) numGroupMembers)
|
||||
*/
|
||||
private final Integer desiredSharing;
|
||||
|
||||
/**
|
||||
* The desired number of assignments for each share group member.
|
||||
* <p>
|
||||
* Members are stored as integer indices into the memberIds array.
|
||||
*/
|
||||
private final int[] desiredAssignmentCounts;
|
||||
|
||||
public TopicAssignmentPartialBuilder(Uuid topicId, int numGroupMembers, List<TopicIdPartition> targetPartitions, List<Integer> subscribedMembers) {
|
||||
this.topicId = topicId;
|
||||
this.targetPartitions = targetPartitions;
|
||||
this.subscribedMembers = subscribedMembers;
|
||||
this.finalAssignmentByPartition = AssignorHelpers.newHashMap(targetPartitions.size());
|
||||
this.finalAssignmentByMember = AssignorHelpers.newHashMap(subscribedMembers.size());
|
||||
|
||||
int numTargetPartitions = targetPartitions.size();
|
||||
int numSubscribedMembers = subscribedMembers.size();
|
||||
if (numTargetPartitions == 0) {
|
||||
this.desiredSharing = 0;
|
||||
} else {
|
||||
this.desiredSharing = (numSubscribedMembers + numTargetPartitions - 1) / numTargetPartitions;
|
||||
}
|
||||
|
||||
// Calculate the desired number of assignments for each member.
|
||||
// The precise desired assignment count per target partition. This can be a fractional number.
|
||||
// We would expect (numSubscribedMembers / numTargetPartitions) assignments per partition, rounded upwards.
|
||||
// Using integer arithmetic: (numSubscribedMembers + numTargetPartitions - 1) / numTargetPartitions
|
||||
this.desiredAssignmentCounts = new int[numGroupMembers];
|
||||
double preciseDesiredAssignmentCount = desiredSharing * numTargetPartitions / (double) numSubscribedMembers;
|
||||
for (int memberIndex = 0; memberIndex < numSubscribedMembers; memberIndex++) {
|
||||
desiredAssignmentCounts[subscribedMembers.get(memberIndex)] =
|
||||
(int) Math.ceil(preciseDesiredAssignmentCount * (double) (memberIndex + 1)) -
|
||||
(int) Math.ceil(preciseDesiredAssignmentCount * (double) memberIndex);
|
||||
}
|
||||
}
|
||||
|
||||
public void build() {
|
||||
// The order of steps here is not that significant, but assignRemainingPartitions must go last.
|
||||
revokeUnassignablePartitions();
|
||||
|
||||
revokeOverfilledMembers();
|
||||
|
||||
revokeOversharedPartitions();
|
||||
|
||||
// Add in any partitions which are currently not in the assignment.
|
||||
targetPartitions.forEach(topicPartition ->
|
||||
finalAssignmentByPartition.computeIfAbsent(topicPartition.partitionId(), k -> AssignorHelpers.newHashSet(subscribedMembers.size())));
|
||||
|
||||
assignRemainingPartitions();
|
||||
}
|
||||
|
||||
/**
|
||||
* Examine the members from the current assignment, making sure that no member has too many assigned partitions.
|
||||
* When looking at the current assignment, we need to only consider the topics in the current assignment that are
|
||||
* also being subscribed in the new assignment.
|
||||
*/
|
||||
private void revokeUnassignablePartitions() {
|
||||
for (Map.Entry<Integer, Map<Uuid, Set<Integer>>> entry : oldGroupAssignment.entrySet()) {
|
||||
Integer memberIndex = entry.getKey();
|
||||
Map<Uuid, Set<Integer>> oldMemberAssignment = entry.getValue();
|
||||
Map<Uuid, Set<Integer>> newMemberAssignment = null;
|
||||
|
||||
if (oldMemberAssignment.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Set<Integer> assignedPartitions = oldMemberAssignment.get(topicId);
|
||||
if (assignedPartitions != null) {
|
||||
if (subscribedTopicIds.contains(topicId)) {
|
||||
for (int partition : assignedPartitions) {
|
||||
finalAssignmentByPartition.computeIfAbsent(partition, k -> new HashSet<>()).add(memberIndex);
|
||||
finalAssignmentByMember.computeIfAbsent(memberIndex, k -> new HashSet<>()).add(partition);
|
||||
}
|
||||
} else {
|
||||
// We create a deep copy of the original assignment so we can alter it.
|
||||
newMemberAssignment = AssignorHelpers.deepCopyAssignment(oldMemberAssignment);
|
||||
|
||||
// Remove the entire topic.
|
||||
newMemberAssignment.remove(topicId);
|
||||
}
|
||||
|
||||
if (newMemberAssignment != null) {
|
||||
newGroupAssignment.put(memberIndex, newMemberAssignment);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Revoke partitions from members which are overfilled.
|
||||
*/
|
||||
private void revokeOverfilledMembers() {
|
||||
finalAssignmentByMember.forEach((memberIndex, assignedPartitions) -> {
|
||||
int memberDesiredAssignmentCount = desiredAssignmentCounts[memberIndex];
|
||||
if (assignedPartitions.size() > memberDesiredAssignmentCount) {
|
||||
Map<Uuid, Set<Integer>> newMemberAssignment = newGroupAssignment.get(memberIndex);
|
||||
Iterator<Integer> partitionIterator = assignedPartitions.iterator();
|
||||
while (partitionIterator.hasNext() && (assignedPartitions.size() > memberDesiredAssignmentCount)) {
|
||||
int partitionIndex = partitionIterator.next();
|
||||
finalAssignmentByPartition.get(partitionIndex).remove(memberIndex);
|
||||
partitionIterator.remove();
|
||||
if (newMemberAssignment == null) {
|
||||
newMemberAssignment = AssignorHelpers.deepCopyAssignment(oldGroupAssignment.get(memberIndex));
|
||||
newGroupAssignment.put(memberIndex, newMemberAssignment);
|
||||
}
|
||||
newMemberAssignment.get(topicId).remove(partitionIndex);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Revoke any over-shared partitions.
|
||||
*/
|
||||
private void revokeOversharedPartitions() {
|
||||
finalAssignmentByPartition.forEach((partitionIndex, assignedMembers) -> {
|
||||
int assignedMemberCount = assignedMembers.size();
|
||||
if (assignedMemberCount > desiredSharing) {
|
||||
Iterator<Integer> assignedMemberIterator = assignedMembers.iterator();
|
||||
while (assignedMemberIterator.hasNext()) {
|
||||
Integer memberIndex = assignedMemberIterator.next();
|
||||
Map<Uuid, Set<Integer>> newMemberAssignment = newGroupAssignment.get(memberIndex);
|
||||
if (newMemberAssignment == null) {
|
||||
newMemberAssignment = AssignorHelpers.deepCopyAssignment(oldGroupAssignment.get(memberIndex));
|
||||
newGroupAssignment.put(memberIndex, newMemberAssignment);
|
||||
}
|
||||
Set<Integer> partitions = newMemberAssignment.get(topicId);
|
||||
if (partitions != null) {
|
||||
if (partitions.remove(partitionIndex)) {
|
||||
assignedMemberCount--;
|
||||
assignedMemberIterator.remove();
|
||||
finalAssignmentByMember.get(memberIndex).remove(partitionIndex);
|
||||
}
|
||||
}
|
||||
if (assignedMemberCount <= desiredSharing) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Assign partitions to unfilled members. It repeatedly iterates through the unfilled members while running
|
||||
* once through the set of partitions. When a partition is found that has insufficient sharing, it attempts to assign
|
||||
* to one of the members.
|
||||
* <p>
|
||||
* There is one tricky case here and that's where a partition wants another assignment, but none of the unfilled
|
||||
* members are able to take it (because they already have that partition). In this situation, we just accept that
|
||||
* no additional assignments for this partition could be made and carry on. In theory, a different shuffling of the
|
||||
* partitions would be able to achieve better balance, but it's harmless tolerating a slight imbalance in this case.
|
||||
* <p>
|
||||
* Note that finalAssignmentByMember is not maintained by this method which is expected to be the final step in the
|
||||
* computation.
|
||||
*/
|
||||
private void assignRemainingPartitions() {
|
||||
Set<Integer> unfilledMembers = AssignorHelpers.newHashSet(numGroupMembers);
|
||||
subscribedMembersByTopic.get(topicId).forEach(memberIndex -> {
|
||||
Set<Integer> assignedPartitions = finalAssignmentByMember.get(memberIndex);
|
||||
int numberOfAssignedPartitions = (assignedPartitions == null) ? 0 : assignedPartitions.size();
|
||||
if (numberOfAssignedPartitions < desiredAssignmentCounts[memberIndex]) {
|
||||
unfilledMembers.add(memberIndex);
|
||||
}
|
||||
});
|
||||
|
||||
Iterator<Integer> memberIterator = unfilledMembers.iterator();
|
||||
boolean partitionAssignedForThisIterator = false;
|
||||
for (Map.Entry<Integer, Set<Integer>> partitionAssignment : finalAssignmentByPartition.entrySet()) {
|
||||
int partitionIndex = partitionAssignment.getKey();
|
||||
Set<Integer> membersAssigned = partitionAssignment.getValue();
|
||||
|
||||
if (membersAssigned.size() < desiredSharing) {
|
||||
int assignmentsToMake = desiredSharing - membersAssigned.size();
|
||||
while (assignmentsToMake > 0) {
|
||||
if (!memberIterator.hasNext()) {
|
||||
if (!partitionAssignedForThisIterator) {
|
||||
break;
|
||||
}
|
||||
memberIterator = unfilledMembers.iterator();
|
||||
partitionAssignedForThisIterator = false;
|
||||
}
|
||||
int memberIndex = memberIterator.next();
|
||||
if (!membersAssigned.contains(memberIndex)) {
|
||||
Map<Uuid, Set<Integer>> newMemberAssignment = newGroupAssignment.get(memberIndex);
|
||||
if (newMemberAssignment == null) {
|
||||
newMemberAssignment = AssignorHelpers.deepCopyAssignment(oldGroupAssignment.get(memberIndex));
|
||||
newGroupAssignment.put(memberIndex, newMemberAssignment);
|
||||
}
|
||||
newMemberAssignment.computeIfAbsent(topicId, k -> new HashSet<>()).add(partitionIndex);
|
||||
finalAssignmentByMember.computeIfAbsent(memberIndex, k -> new HashSet<>()).add(partitionIndex);
|
||||
assignmentsToMake--;
|
||||
partitionAssignedForThisIterator = true;
|
||||
if (finalAssignmentByMember.get(memberIndex).size() >= desiredAssignmentCounts[memberIndex]) {
|
||||
memberIterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (unfilledMembers.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@ import org.apache.kafka.common.Uuid;
|
|||
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
|
||||
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
|
||||
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
|
||||
import org.apache.kafka.server.common.TopicIdPartition;
|
||||
|
@ -141,7 +142,7 @@ public class SimpleHomogeneousAssignmentBuilder {
|
|||
memberIndices.put(memberIds.get(memberIndex), memberIndex);
|
||||
}
|
||||
|
||||
this.targetPartitions = AssignorHelpers.computeTargetPartitions(groupSpec, subscribedTopicIds, subscribedTopicDescriber);
|
||||
this.targetPartitions = computeTargetPartitions(groupSpec, subscribedTopicIds, subscribedTopicDescriber);
|
||||
|
||||
int numTargetPartitions = targetPartitions.size();
|
||||
if (numTargetPartitions == 0) {
|
||||
|
@ -328,10 +329,10 @@ public class SimpleHomogeneousAssignmentBuilder {
|
|||
|
||||
/**
|
||||
* Assign partitions to unfilled members. It repeatedly iterates through the unfilled members while running
|
||||
* once thrown the set of partitions. When a partition is found that has insufficient sharing, it attempts to assign
|
||||
* to one of the partitions.
|
||||
* once through the set of partitions. When a partition is found that has insufficient sharing, it attempts to assign
|
||||
* to one of the members.
|
||||
* <p>
|
||||
* There is one tricky cases here and that's where a partition wants another assignment, but none of the unfilled
|
||||
* There is one tricky case here and that's where a partition wants another assignment, but none of the unfilled
|
||||
* members are able to take it (because they already have that partition). In this situation, we just accept that
|
||||
* no additional assignments for this partition could be made and carry on. In theory, a different shuffling of the
|
||||
* partitions would be able to achieve better balance, but it's harmless tolerating a slight imbalance in this case.
|
||||
|
@ -382,4 +383,36 @@ public class SimpleHomogeneousAssignmentBuilder {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the list of target partitions which can be assigned to members. This list includes all partitions
|
||||
* for the subscribed topic IDs, with the additional check that they must be assignable.
|
||||
* @param groupSpec The assignment spec which includes member metadata.
|
||||
* @param subscribedTopicIds The set of subscribed topic IDs.
|
||||
* @param subscribedTopicDescriber The topic and partition metadata describer.
|
||||
* @return The list of target partitions.
|
||||
*/
|
||||
private static List<TopicIdPartition> computeTargetPartitions(
|
||||
GroupSpec groupSpec,
|
||||
Set<Uuid> subscribedTopicIds,
|
||||
SubscribedTopicDescriber subscribedTopicDescriber
|
||||
) {
|
||||
List<TopicIdPartition> targetPartitions = new ArrayList<>();
|
||||
subscribedTopicIds.forEach(topicId -> {
|
||||
int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
|
||||
if (numPartitions == -1) {
|
||||
throw new PartitionAssignorException(
|
||||
"Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata."
|
||||
);
|
||||
}
|
||||
|
||||
for (int partition = 0; partition < numPartitions; partition++) {
|
||||
if (groupSpec.isPartitionAssignable(topicId, partition)) {
|
||||
targetPartitions.add(new TopicIdPartition(topicId, partition));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return targetPartitions;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,10 +31,10 @@ import org.apache.kafka.server.common.TopicIdPartition;
|
|||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
@ -45,7 +45,6 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
|
|||
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class SimpleAssignorTest {
|
||||
|
||||
|
@ -209,7 +208,7 @@ public class SimpleAssignorTest {
|
|||
public void testAssignWithTwoMembersAndTwoTopicsHomogeneousWithAllowedMap() {
|
||||
MetadataImage metadataImage = new MetadataImageBuilder()
|
||||
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
|
||||
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 2)
|
||||
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 3)
|
||||
.build();
|
||||
|
||||
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
|
||||
|
@ -239,7 +238,7 @@ public class SimpleAssignorTest {
|
|||
Optional.of(
|
||||
Map.of(
|
||||
TOPIC_1_UUID, Set.of(0, 1, 2),
|
||||
TOPIC_3_UUID, Set.of(0, 1)
|
||||
TOPIC_3_UUID, Set.of(0, 1) // but not 2
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -354,25 +353,8 @@ public class SimpleAssignorTest {
|
|||
subscribedTopicMetadata
|
||||
);
|
||||
|
||||
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67.
|
||||
// Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by hash assignment.
|
||||
// Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, T2:1 -> member_C by round-robin assignment.
|
||||
// Step 3 -> no new assignment gets added by current assignment since it is empty.
|
||||
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
|
||||
expectedAssignment.put(MEMBER_A, mkAssignment(
|
||||
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
|
||||
mkTopicAssignment(TOPIC_2_UUID, 0, 2)
|
||||
));
|
||||
expectedAssignment.put(MEMBER_B, mkAssignment(
|
||||
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
|
||||
));
|
||||
expectedAssignment.put(MEMBER_C, mkAssignment(
|
||||
mkTopicAssignment(TOPIC_2_UUID, 1, 2)
|
||||
));
|
||||
|
||||
// T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 partitions
|
||||
assertEveryPartitionGetsAssignment(8, computedAssignment);
|
||||
assertAssignment(expectedAssignment, computedAssignment);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -418,9 +400,9 @@ public class SimpleAssignorTest {
|
|||
Map.of(),
|
||||
Optional.of(
|
||||
Map.of(
|
||||
TOPIC_1_UUID, Set.of(0, 1, 2),
|
||||
TOPIC_2_UUID, Set.of(0, 1, 2),
|
||||
TOPIC_3_UUID, Set.of(0, 1)
|
||||
TOPIC_1_UUID, Set.of(0, 1), // but not 2
|
||||
TOPIC_2_UUID, Set.of(0, 2), // but not 1
|
||||
TOPIC_3_UUID, Set.of(1) // but not 0
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -434,25 +416,8 @@ public class SimpleAssignorTest {
|
|||
subscribedTopicMetadata
|
||||
);
|
||||
|
||||
// Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of MEMBER_C is 67.
|
||||
// Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by hash assignment.
|
||||
// Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, T2:1 -> member_C by round-robin assignment.
|
||||
// Step 3 -> no new assignment gets added by current assignment since it is empty.
|
||||
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
|
||||
expectedAssignment.put(MEMBER_A, mkAssignment(
|
||||
mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
|
||||
mkTopicAssignment(TOPIC_2_UUID, 0, 2)
|
||||
));
|
||||
expectedAssignment.put(MEMBER_B, mkAssignment(
|
||||
mkTopicAssignment(TOPIC_3_UUID, 0, 1)
|
||||
));
|
||||
expectedAssignment.put(MEMBER_C, mkAssignment(
|
||||
mkTopicAssignment(TOPIC_2_UUID, 1, 2)
|
||||
));
|
||||
|
||||
// T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 partitions
|
||||
assertEveryPartitionGetsAssignment(8, computedAssignment);
|
||||
assertAssignment(expectedAssignment, computedAssignment);
|
||||
// T1: 2 partitions + T2: 2 partitions + T3: 1 partition = 5 partitions
|
||||
assertEveryPartitionGetsAssignment(5, computedAssignment);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -578,60 +543,6 @@ public class SimpleAssignorTest {
|
|||
assertAssignment(expectedAssignment, computedAssignment);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMemberHashAssignment() {
|
||||
// hashcode for "member1" is 948881623.
|
||||
String member1 = "member1";
|
||||
// hashcode for "member2" is 948881624.
|
||||
String member2 = "member2";
|
||||
// hashcode for "member3" is 948881625.
|
||||
String member3 = "member3";
|
||||
// hashcode for "member4" is 948881626.
|
||||
String member4 = "member4";
|
||||
// hashcode for "AaAaAaAa" is -540425984 to test with negative hashcode.
|
||||
String member5 = "AaAaAaAa";
|
||||
List<String> members = List.of(member1, member2, member3, member4, member5);
|
||||
|
||||
TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
|
||||
TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
|
||||
TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
|
||||
List<TopicIdPartition> partitions = List.of(partition1, partition2, partition3);
|
||||
|
||||
Map<TopicIdPartition, List<String>> computedAssignment = new HashMap<>();
|
||||
assignor.memberHashAssignment(members, partitions, computedAssignment);
|
||||
|
||||
Map<TopicIdPartition, List<String>> expectedAssignment = new HashMap<>();
|
||||
expectedAssignment.put(partition1, List.of(member3));
|
||||
expectedAssignment.put(partition2, List.of(member1, member4));
|
||||
expectedAssignment.put(partition3, List.of(member2, member5));
|
||||
assertAssignment(expectedAssignment, computedAssignment);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRoundRobinAssignment() {
|
||||
String member1 = "member1";
|
||||
String member2 = "member2";
|
||||
List<String> members = List.of(member1, member2);
|
||||
TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
|
||||
TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
|
||||
TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
|
||||
TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
|
||||
List<TopicIdPartition> unassignedPartitions = List.of(partition2, partition3, partition4);
|
||||
|
||||
Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
|
||||
assignment.put(partition1, List.of(member1));
|
||||
|
||||
assignor.roundRobinAssignment(members, unassignedPartitions, assignment);
|
||||
Map<TopicIdPartition, List<String>> expectedAssignment = Map.of(
|
||||
partition1, List.of(member1),
|
||||
partition2, List.of(member1),
|
||||
partition3, List.of(member2),
|
||||
partition4, List.of(member1)
|
||||
);
|
||||
|
||||
assertAssignment(expectedAssignment, assignment);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementalAssignmentIncreasingMembersHomogeneous() {
|
||||
final int numPartitions = 24;
|
||||
|
@ -859,6 +770,166 @@ public class SimpleAssignorTest {
|
|||
assertEveryPartitionGetsAssignment(9, computedAssignment2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementalAssignmentIncreasingMembersHeterogeneous() {
|
||||
final int numPartitions = 24;
|
||||
final int numMembers = 101;
|
||||
|
||||
MetadataImage metadataImage = new MetadataImageBuilder()
|
||||
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, numPartitions / 2)
|
||||
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, numPartitions / 3)
|
||||
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, numPartitions / 6)
|
||||
.build();
|
||||
|
||||
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
|
||||
metadataImage
|
||||
);
|
||||
|
||||
ArrayList<Set<Uuid>> topicsSubscriptions = new ArrayList<>(3);
|
||||
Set<Uuid> topicsSubscription1 = new LinkedHashSet<>();
|
||||
topicsSubscription1.add(TOPIC_1_UUID);
|
||||
topicsSubscription1.add(TOPIC_2_UUID);
|
||||
topicsSubscription1.add(TOPIC_3_UUID);
|
||||
topicsSubscriptions.add(topicsSubscription1);
|
||||
Set<Uuid> topicsSubscription2 = new LinkedHashSet<>();
|
||||
topicsSubscription2.add(TOPIC_2_UUID);
|
||||
topicsSubscriptions.add(topicsSubscription2);
|
||||
Set<Uuid> topicsSubscription3 = new LinkedHashSet<>();
|
||||
topicsSubscription3.add(TOPIC_3_UUID);
|
||||
topicsSubscriptions.add(topicsSubscription3);
|
||||
Set<Uuid> topicsSubscription4 = new LinkedHashSet<>();
|
||||
topicsSubscription4.add(TOPIC_1_UUID);
|
||||
topicsSubscription4.add(TOPIC_2_UUID);
|
||||
topicsSubscriptions.add(topicsSubscription4);
|
||||
int numTopicsSubscriptions = 4;
|
||||
|
||||
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
|
||||
|
||||
SimpleAssignor assignor = new SimpleAssignor();
|
||||
|
||||
// Increase the number of members one a time, checking that the partitions are assigned as expected
|
||||
for (int member = 0; member < numMembers; member++) {
|
||||
String newMemberId = "M" + member;
|
||||
members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
|
||||
Optional.empty(),
|
||||
Optional.empty(),
|
||||
topicsSubscriptions.get(member % numTopicsSubscriptions),
|
||||
Assignment.EMPTY
|
||||
));
|
||||
|
||||
GroupSpec groupSpec = new GroupSpecImpl(
|
||||
members,
|
||||
HETEROGENEOUS,
|
||||
new HashMap<>()
|
||||
);
|
||||
|
||||
GroupAssignment computedAssignment = assignor.assign(groupSpec, subscribedTopicMetadata);
|
||||
assertEveryPartitionGetsAssignment(numPartitions, computedAssignment);
|
||||
|
||||
for (int m = 0; m < member; m++) {
|
||||
String memberId = "M" + m;
|
||||
members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
|
||||
Optional.empty(),
|
||||
Optional.empty(),
|
||||
topicsSubscriptions.get(m % numTopicsSubscriptions),
|
||||
new Assignment(computedAssignment.members().get(memberId).partitions())
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementalAssignmentDecreasingMembersHeterogeneous() {
|
||||
final int numPartitions = 24;
|
||||
final int numMembers = 101;
|
||||
|
||||
MetadataImage metadataImage = new MetadataImageBuilder()
|
||||
.addTopic(TOPIC_1_UUID, TOPIC_1_NAME, numPartitions / 2)
|
||||
.addTopic(TOPIC_2_UUID, TOPIC_2_NAME, numPartitions / 3)
|
||||
.addTopic(TOPIC_3_UUID, TOPIC_3_NAME, numPartitions / 6)
|
||||
.build();
|
||||
|
||||
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(
|
||||
metadataImage
|
||||
);
|
||||
|
||||
ArrayList<Set<Uuid>> topicsSubscriptions = new ArrayList<>(3);
|
||||
Set<Uuid> topicsSubscription1 = new LinkedHashSet<>();
|
||||
topicsSubscription1.add(TOPIC_1_UUID);
|
||||
topicsSubscription1.add(TOPIC_2_UUID);
|
||||
topicsSubscription1.add(TOPIC_3_UUID);
|
||||
topicsSubscriptions.add(topicsSubscription1);
|
||||
Set<Uuid> topicsSubscription2 = new LinkedHashSet<>();
|
||||
topicsSubscription2.add(TOPIC_2_UUID);
|
||||
topicsSubscriptions.add(topicsSubscription2);
|
||||
Set<Uuid> topicsSubscription3 = new LinkedHashSet<>();
|
||||
topicsSubscription3.add(TOPIC_3_UUID);
|
||||
topicsSubscriptions.add(topicsSubscription3);
|
||||
Set<Uuid> topicsSubscription4 = new LinkedHashSet<>();
|
||||
topicsSubscription4.add(TOPIC_1_UUID);
|
||||
topicsSubscription4.add(TOPIC_2_UUID);
|
||||
topicsSubscriptions.add(topicsSubscription4);
|
||||
int numTopicsSubscriptions = 4;
|
||||
|
||||
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
|
||||
|
||||
SimpleAssignor assignor = new SimpleAssignor();
|
||||
|
||||
for (int member = 0; member < numMembers; member++) {
|
||||
String newMemberId = "M" + member;
|
||||
members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
|
||||
Optional.empty(),
|
||||
Optional.empty(),
|
||||
topicsSubscriptions.get(member % numTopicsSubscriptions),
|
||||
Assignment.EMPTY
|
||||
));
|
||||
}
|
||||
|
||||
GroupSpec groupSpec = new GroupSpecImpl(
|
||||
members,
|
||||
HETEROGENEOUS,
|
||||
new HashMap<>()
|
||||
);
|
||||
|
||||
GroupAssignment computedAssignment = assignor.assign(groupSpec, subscribedTopicMetadata);
|
||||
assertEveryPartitionGetsAssignment(numPartitions, computedAssignment);
|
||||
|
||||
for (int member = 0; member < numMembers; member++) {
|
||||
String newMemberId = "M" + member;
|
||||
members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
|
||||
Optional.empty(),
|
||||
Optional.empty(),
|
||||
topicsSubscriptions.get(member % numTopicsSubscriptions),
|
||||
new Assignment(computedAssignment.members().get(newMemberId).partitions()))
|
||||
);
|
||||
}
|
||||
|
||||
// Decrease the number of members one a time, checking that the partitions are assigned as expected
|
||||
for (int member = numMembers - 1; member > 0; member--) {
|
||||
String newMemberId = "M" + member;
|
||||
members.remove(newMemberId);
|
||||
|
||||
groupSpec = new GroupSpecImpl(
|
||||
members,
|
||||
HETEROGENEOUS,
|
||||
new HashMap<>()
|
||||
);
|
||||
|
||||
computedAssignment = assignor.assign(groupSpec, subscribedTopicMetadata);
|
||||
assertEveryPartitionGetsAssignment(numPartitions, computedAssignment);
|
||||
|
||||
for (int m = 0; m < member; m++) {
|
||||
String memberId = "M" + m;
|
||||
members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
|
||||
Optional.empty(),
|
||||
Optional.empty(),
|
||||
topicsSubscriptions.get(m % numTopicsSubscriptions),
|
||||
new Assignment(computedAssignment.members().get(memberId).partitions())
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void assertAssignment(
|
||||
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment,
|
||||
GroupAssignment computedGroupAssignment
|
||||
|
@ -870,18 +941,6 @@ public class SimpleAssignorTest {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertAssignment(
|
||||
Map<TopicIdPartition, List<String>> expectedAssignment,
|
||||
Map<TopicIdPartition, List<String>> computedAssignment
|
||||
) {
|
||||
assertEquals(expectedAssignment.size(), computedAssignment.size());
|
||||
expectedAssignment.forEach((topicIdPartition, members) -> {
|
||||
List<String> computedMembers = computedAssignment.getOrDefault(topicIdPartition, List.of());
|
||||
assertEquals(members.size(), computedMembers.size());
|
||||
members.forEach(member -> assertTrue(computedMembers.contains(member)));
|
||||
});
|
||||
}
|
||||
|
||||
private void assertEveryPartitionGetsAssignment(
|
||||
int expectedPartitions,
|
||||
GroupAssignment computedGroupAssignment
|
||||
|
|
Loading…
Reference in New Issue