MINOR: Add comments to constrainedAssign and generalAssign method (#9096)

Enhance the understandability for constrainedAssign and generalAssign method by getting more detailed meta comments.

Co-authored-by: A. Sophie Blee-Goldman <ableegoldman@gmail.com>

Reviewers: Boyang Chen <boyang@confluent.io>, A. Sophie Blee-Goldman <ableegoldman@gmail.com>
This commit is contained in:
showuon 2020-08-04 02:51:20 +08:00 committed by GitHub
parent 5c2991aff6
commit 28b7d8e216
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 38 additions and 1 deletions

View File

@ -144,6 +144,23 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
return true;
}
/**
* This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics.
* The method includes the following steps:
*
* 1. Reassign as many previously owned partitions as possible, up to the maxQuota
* 2. Fill remaining members up to minQuota
* 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
* from the over-full consumers at max capacity
* 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we
* should just distribute one partition each to all consumers at min capacity
*
* @param partitionsPerTopic The number of partitions for each subscribed topic
* @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions
*
* @return Map from each member to the list of partitions assigned to them.
*/
private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic);
@ -151,8 +168,11 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
// Each consumer should end up in exactly one of the below
// the consumers not yet at capacity
List<String> unfilledMembers = new LinkedList<>();
// the members with exactly maxQuota partitions assigned
Queue<String> maxCapacityMembers = new LinkedList<>();
// the members with exactly minQuota partitions assigned
Queue<String> minCapacityMembers = new LinkedList<>();
int numberOfConsumers = consumerToOwnedPartitions.size();
@ -163,6 +183,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
Map<String, List<TopicPartition>> assignment = new HashMap<>(
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
@ -194,6 +215,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
Collections.sort(unfilledMembers);
Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
// Fill remaining members up to minQuota
while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
@ -268,6 +290,21 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
return allPartitions;
}
/**
* This generalAssign algorithm guarantees the assignment that is as balanced as possible.
* This method includes the following steps:
*
* 1. Preserving all the existing partition assignments
* 2. Removing all the partition assignments that have become invalid due to the change that triggers the reassignment
* 3. Assigning the unassigned partitions in a way that balances out the overall assignments of partitions to consumers
* 4. Further balancing out the resulting assignment by finding the partitions that can be reassigned
* to another consumer towards an overall more balanced assignment.
*
* @param partitionsPerTopic The number of partitions for each subscribed topic.
* @param subscriptions Map from the member id to their respective topic subscription
*
* @return Map from each member to the list of partitions assigned to them.
*/
private Map<String, List<TopicPartition>> generalAssign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> currentAssignment = new HashMap<>();
@ -966,4 +1003,4 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
return false;
}
}
}
}