KAFKA-18688: Fix uniform homogeneous assignor stability (#19677)
CI / build (push) Waiting to run Details

When the number of partitions is not divisible by the number of members,
some members will end up with one more partition than others.
Previously, we required these to be the members at the start of the
iteration order, which meant that partitions could be reassigned even
when the previous assignment was already balanced.

Allow any member to have the extra partition, so that we do not move
partitions around when the previous assignment is already balanced.

Before the PR
```
Benchmark                                 (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score   Error  Units
ServerSideAssignorBenchmark.doAssignment              FULL           RANGE          false          10000                         50         HOMOGENEOUS          1000  avgt    2   26.175          ms/op
ServerSideAssignorBenchmark.doAssignment              FULL           RANGE          false          10000                         50       HETEROGENEOUS          1000  avgt    2  123.955          ms/op
ServerSideAssignorBenchmark.doAssignment       INCREMENTAL           RANGE          false          10000                         50         HOMOGENEOUS          1000  avgt    2   24.408          ms/op
ServerSideAssignorBenchmark.doAssignment       INCREMENTAL           RANGE          false          10000                         50       HETEROGENEOUS          1000  avgt    2  114.873          ms/op
```
After the PR
```
Benchmark                                 (assignmentType)  (assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  (subscriptionType)  (topicCount)  Mode  Cnt    Score   Error  Units
ServerSideAssignorBenchmark.doAssignment              FULL           RANGE          false          10000                         50         HOMOGENEOUS          1000  avgt    2   24.259          ms/op
ServerSideAssignorBenchmark.doAssignment              FULL           RANGE          false          10000                         50       HETEROGENEOUS          1000  avgt    2  118.513          ms/op
ServerSideAssignorBenchmark.doAssignment       INCREMENTAL           RANGE          false          10000                         50         HOMOGENEOUS          1000  avgt    2   24.636          ms/op
ServerSideAssignorBenchmark.doAssignment       INCREMENTAL           RANGE          false          10000                         50       HETEROGENEOUS          1000  avgt    2  115.503          ms/op
```

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2025-05-13 16:01:14 +01:00 committed by David Jacot
parent 48f75616d7
commit d64a97099d
2 changed files with 100 additions and 12 deletions

View File

@ -153,8 +153,13 @@ public class UniformHomogeneousAssignmentBuilder {
* This method ensures that the original assignment is not copied if it is not * This method ensures that the original assignment is not copied if it is not
* altered. * altered.
*/ */
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
private void maybeRevokePartitions() { private void maybeRevokePartitions() {
int memberCount = groupSpec.memberIds().size();
int memberIndex = -1;
for (String memberId : groupSpec.memberIds()) { for (String memberId : groupSpec.memberIds()) {
memberIndex++;
Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId).partitions(); Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId).partitions();
Map<Uuid, Set<Integer>> newAssignment = null; Map<Uuid, Set<Integer>> newAssignment = null;
@ -165,9 +170,10 @@ public class UniformHomogeneousAssignmentBuilder {
} }
int quota = minimumMemberQuota; int quota = minimumMemberQuota;
boolean quotaHasExtraPartition = false;
if (remainingMembersToGetAnExtraPartition > 0) { if (remainingMembersToGetAnExtraPartition > 0) {
quota++; quota++;
remainingMembersToGetAnExtraPartition--; quotaHasExtraPartition = true;
} }
for (Map.Entry<Uuid, Set<Integer>> topicPartitions : oldAssignment.entrySet()) { for (Map.Entry<Uuid, Set<Integer>> topicPartitions : oldAssignment.entrySet()) {
@ -209,10 +215,24 @@ public class UniformHomogeneousAssignmentBuilder {
} }
} }
if (quota > 0 &&
quotaHasExtraPartition &&
memberCount - memberIndex > remainingMembersToGetAnExtraPartition) {
// Give up the extra partition quota for another member to claim,
// unless this member is one of the last remainingMembersToGetAnExtraPartition
// members in the list and must take the extra partition.
quota--;
quotaHasExtraPartition = false;
}
if (quota > 0) { if (quota > 0) {
unfilledMembers.add(new MemberWithRemainingQuota(memberId, quota)); unfilledMembers.add(new MemberWithRemainingQuota(memberId, quota));
} }
if (quotaHasExtraPartition) {
remainingMembersToGetAnExtraPartition--;
}
if (newAssignment == null) { if (newAssignment == null) {
targetAssignment.put(memberId, new MemberAssignmentImpl(oldAssignment)); targetAssignment.put(memberId, new MemberAssignmentImpl(oldAssignment));
} else { } else {

View File

@ -162,11 +162,10 @@ public class OptimizedUniformAssignmentBuilderTest {
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment( expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0),
mkTopicAssignment(topic3Uuid, 0, 1) mkTopicAssignment(topic3Uuid, 0, 1)
)); ));
expectedAssignment.put(memberB, mkAssignment( expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1, 2) mkTopicAssignment(topic1Uuid, 0, 1, 2)
)); ));
GroupSpec groupSpec = new GroupSpecImpl( GroupSpec groupSpec = new GroupSpecImpl(
@ -219,15 +218,15 @@ public class OptimizedUniformAssignmentBuilderTest {
// Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment. // Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment( expectedAssignment.put(memberA,
mkTopicAssignment(topic3Uuid, 0)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic3Uuid, 1)
));
expectedAssignment.put(memberC,
Collections.emptyMap() Collections.emptyMap()
); );
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic3Uuid, 0)
));
expectedAssignment.put(memberC, mkAssignment(
mkTopicAssignment(topic3Uuid, 1)
));
GroupSpec groupSpec = new GroupSpecImpl( GroupSpec groupSpec = new GroupSpecImpl(
members, members,
@ -383,11 +382,11 @@ public class OptimizedUniformAssignmentBuilderTest {
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment( expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 2, 3), mkTopicAssignment(topic1Uuid, 0, 2),
mkTopicAssignment(topic2Uuid, 0, 3, 4) mkTopicAssignment(topic2Uuid, 0, 3, 4)
)); ));
expectedAssignment.put(memberB, mkAssignment( expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1, 4, 5), mkTopicAssignment(topic1Uuid, 1, 3, 4, 5),
mkTopicAssignment(topic2Uuid, 1, 2) mkTopicAssignment(topic2Uuid, 1, 2)
)); ));
@ -604,6 +603,75 @@ public class OptimizedUniformAssignmentBuilderTest {
checkValidityAndBalance(members, computedAssignment); checkValidityAndBalance(members, computedAssignment);
} }
@Test
public void testReassignmentStickinessWhenAlreadyBalanced() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
5
));
// A TreeMap ensures that memberA is first in the iteration order.
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
// Two members must have extra partitions. In the previous assignment, they were members A
// and C.
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
Set.of(topic1Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 3)
))
));
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
Set.of(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 1)
))
));
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
Set.of(topic1Uuid, topic2Uuid),
new Assignment(mkAssignment(
mkTopicAssignment(topic1Uuid, 2, 4)
))
));
// Members A and C should keep their partitions.
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
expectedAssignment.put(memberA, mkAssignment(
mkTopicAssignment(topic1Uuid, 0, 3)
));
expectedAssignment.put(memberB, mkAssignment(
mkTopicAssignment(topic1Uuid, 1)
));
expectedAssignment.put(memberC, mkAssignment(
mkTopicAssignment(topic1Uuid, 2, 4)
));
GroupSpec groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
invertedTargetAssignment(members)
);
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
GroupAssignment computedAssignment = assignor.assign(
groupSpec,
subscribedTopicMetadata
);
assertAssignment(expectedAssignment, computedAssignment);
checkValidityAndBalance(members, computedAssignment);
}
/** /**
* Verifies that the given assignment is valid with respect to the given subscriptions. * Verifies that the given assignment is valid with respect to the given subscriptions.
* Validity requirements: * Validity requirements: