diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHomogeneousAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHomogeneousAssignmentBuilder.java index 75e831edd86..1846336ea67 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHomogeneousAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHomogeneousAssignmentBuilder.java @@ -152,8 +152,13 @@ public class UniformHomogeneousAssignmentBuilder { * This method ensures that the original assignment is not copied if it is not * altered. */ + @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"}) private void maybeRevokePartitions() { + int memberCount = groupSpec.memberIds().size(); + int memberIndex = -1; for (String memberId : groupSpec.memberIds()) { + memberIndex++; + Map> oldAssignment = groupSpec.memberAssignment(memberId).partitions(); Map> newAssignment = null; @@ -164,9 +169,10 @@ public class UniformHomogeneousAssignmentBuilder { } int quota = minimumMemberQuota; + boolean quotaHasExtraPartition = false; if (remainingMembersToGetAnExtraPartition > 0) { quota++; - remainingMembersToGetAnExtraPartition--; + quotaHasExtraPartition = true; } for (Map.Entry> topicPartitions : oldAssignment.entrySet()) { @@ -208,10 +214,24 @@ public class UniformHomogeneousAssignmentBuilder { } } + if (quota > 0 && + quotaHasExtraPartition && + memberCount - memberIndex > remainingMembersToGetAnExtraPartition) { + // Give up the extra partition quota for another member to claim, + // unless this member is one of the last remainingMembersToGetAnExtraPartition + // members in the list and must take the extra partition. + quota--; + quotaHasExtraPartition = false; + } + if (quota > 0) { unfilledMembers.add(new MemberWithRemainingQuota(memberId, quota)); } + if (quotaHasExtraPartition) { + remainingMembersToGetAnExtraPartition--; + } + if (newAssignment == null) { targetAssignment.put(memberId, new MemberAssignmentImpl(oldAssignment)); } else { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java index e47aadb482a..d37b75ce9fe 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java @@ -161,11 +161,10 @@ public class OptimizedUniformAssignmentBuilderTest { Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0), mkTopicAssignment(topic3Uuid, 0, 1) )); expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 2) + mkTopicAssignment(topic1Uuid, 0, 1, 2) )); GroupSpec groupSpec = new GroupSpecImpl( @@ -218,15 +217,15 @@ public class OptimizedUniformAssignmentBuilderTest { // Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment. Map>> expectedAssignment = new HashMap<>(); - expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic3Uuid, 0) - )); - expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic3Uuid, 1) - )); - expectedAssignment.put(memberC, + expectedAssignment.put(memberA, Map.of() ); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic3Uuid, 0) + )); + expectedAssignment.put(memberC, mkAssignment( + mkTopicAssignment(topic3Uuid, 1) + )); GroupSpec groupSpec = new GroupSpecImpl( members, @@ -382,11 +381,11 @@ public class OptimizedUniformAssignmentBuilderTest { Map>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 2, 3), + mkTopicAssignment(topic1Uuid, 0, 2), mkTopicAssignment(topic2Uuid, 0, 3, 4) )); expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 4, 5), + mkTopicAssignment(topic1Uuid, 1, 3, 4, 5), mkTopicAssignment(topic2Uuid, 1, 2) )); @@ -603,6 +602,75 @@ public class OptimizedUniformAssignmentBuilderTest { checkValidityAndBalance(members, computedAssignment); } + @Test + public void testReassignmentStickinessWhenAlreadyBalanced() { + Map topicMetadata = new HashMap<>(); + topicMetadata.put(topic1Uuid, new TopicMetadata( + topic1Uuid, + topic1Name, + 5 + )); + + // A TreeMap ensures that memberA is first in the iteration order. + Map members = new TreeMap<>(); + + // Two members must have extra partitions. In the previous assignment, they were members A + // and C. + members.put(memberA, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + Set.of(topic1Uuid), + new Assignment(mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 3) + )) + )); + + members.put(memberB, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + Set.of(topic1Uuid, topic2Uuid), + new Assignment(mkAssignment( + mkTopicAssignment(topic1Uuid, 1) + )) + )); + + members.put(memberC, new MemberSubscriptionAndAssignmentImpl( + Optional.empty(), + Optional.empty(), + Set.of(topic1Uuid, topic2Uuid), + new Assignment(mkAssignment( + mkTopicAssignment(topic1Uuid, 2, 4) + )) + )); + + // Members A and C should keep their partitions. + Map>> expectedAssignment = new HashMap<>(); + expectedAssignment.put(memberA, mkAssignment( + mkTopicAssignment(topic1Uuid, 0, 3) + )); + expectedAssignment.put(memberB, mkAssignment( + mkTopicAssignment(topic1Uuid, 1) + )); + expectedAssignment.put(memberC, mkAssignment( + mkTopicAssignment(topic1Uuid, 2, 4) + )); + + GroupSpec groupSpec = new GroupSpecImpl( + members, + HOMOGENEOUS, + invertedTargetAssignment(members) + ); + SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata); + + GroupAssignment computedAssignment = assignor.assign( + groupSpec, + subscribedTopicMetadata + ); + + assertAssignment(expectedAssignment, computedAssignment); + checkValidityAndBalance(members, computedAssignment); + } + /** * Verifies that the given assignment is valid with respect to the given subscriptions. * Validity requirements: