From 28b7d8e21656649fb09b09f9bacfe865b0ca133c Mon Sep 17 00:00:00 2001 From: showuon <43372967+showuon@users.noreply.github.com> Date: Tue, 4 Aug 2020 02:51:20 +0800 Subject: [PATCH] MINOR: Add comments to constrainedAssign and generalAssign method (#9096) Enhance the understandability for constrainedAssign and generalAssign method by getting more detailed meta comments. Co-authored-by: A. Sophie Blee-Goldman Reviewers: Boyang Chen , A. Sophie Blee-Goldman --- .../internals/AbstractStickyAssignor.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java index 629f05acdac..7e42e44a7d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java @@ -144,6 +144,23 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { return true; } + + /** + * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. + * The method includes the following steps: + * + * 1. Reassign as many previously owned partitions as possible, up to the maxQuota + * 2. Fill remaining members up to minQuota + * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions + * from the over-full consumers at max capacity + * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we + * should just distribute one partition each to all consumers at min capacity + * + * @param partitionsPerTopic The number of partitions for each subscribed topic + * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions + * + * @return Map from each member to the list of partitions assigned to them. + */ private Map> constrainedAssign(Map partitionsPerTopic, Map> consumerToOwnedPartitions) { SortedSet unassignedPartitions = getTopicPartitions(partitionsPerTopic); @@ -151,8 +168,11 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { Set allRevokedPartitions = new HashSet<>(); // Each consumer should end up in exactly one of the below + // the consumers not yet at capacity List unfilledMembers = new LinkedList<>(); + // the members with exactly maxQuota partitions assigned Queue maxCapacityMembers = new LinkedList<>(); + // the members with exactly minQuota partitions assigned Queue minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); @@ -163,6 +183,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { Map> assignment = new HashMap<>( consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + // Reassign as many previously owned partitions as possible for (Map.Entry> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List ownedPartitions = consumerEntry.getValue(); @@ -194,6 +215,7 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { Collections.sort(unfilledMembers); Iterator unassignedPartitionsIter = unassignedPartitions.iterator(); + // Fill remaining members up to minQuota while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) { Iterator unfilledConsumerIter = unfilledMembers.iterator(); @@ -268,6 +290,21 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { return allPartitions; } + /** + * This generalAssign algorithm guarantees the assignment that is as balanced as possible. + * This method includes the following steps: + * + * 1. Preserving all the existing partition assignments + * 2. Removing all the partition assignments that have become invalid due to the change that triggers the reassignment + * 3. Assigning the unassigned partitions in a way that balances out the overall assignments of partitions to consumers + * 4. Further balancing out the resulting assignment by finding the partitions that can be reassigned + * to another consumer towards an overall more balanced assignment. + * + * @param partitionsPerTopic The number of partitions for each subscribed topic. + * @param subscriptions Map from the member id to their respective topic subscription + * + * @return Map from each member to the list of partitions assigned to them. + */ private Map> generalAssign(Map partitionsPerTopic, Map subscriptions) { Map> currentAssignment = new HashMap<>(); @@ -966,4 +1003,4 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor { return false; } } -} \ No newline at end of file +}