mirror of https://github.com/apache/kafka.git
KAFKA-18688: Fix uniform homogeneous assignor stability (#19677)
When the number of partitions is not divisible by the number of members, some members will end up with one more partition than others. Previously, we required these to be the members at the start of the iteration order, which meant that partitions could be reassigned even when the previous assignment was already balanced. Allow any member to have the extra partition, so that we do not move partitions around when the previous assignment is already balanced. Before the PR ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units ServerSideAssignorBenchmark.doAssignment FULL RANGE false 10000 50 HOMOGENEOUS 1000 avgt 2 26.175 ms/op ServerSideAssignorBenchmark.doAssignment FULL RANGE false 10000 50 HETEROGENEOUS 1000 avgt 2 123.955 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 50 HOMOGENEOUS 1000 avgt 2 24.408 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 50 HETEROGENEOUS 1000 avgt 2 114.873 ms/op ``` After the PR ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units ServerSideAssignorBenchmark.doAssignment FULL RANGE false 10000 50 HOMOGENEOUS 1000 avgt 2 24.259 ms/op ServerSideAssignorBenchmark.doAssignment FULL RANGE false 10000 50 HETEROGENEOUS 1000 avgt 2 118.513 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 50 HOMOGENEOUS 1000 avgt 2 24.636 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 10000 50 HETEROGENEOUS 1000 avgt 2 115.503 ms/op ``` Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
parent
d5ce463ed3
commit
c16c240bd1
|
@ -152,8 +152,13 @@ public class UniformHomogeneousAssignmentBuilder {
|
||||||
* This method ensures that the original assignment is not copied if it is not
|
* This method ensures that the original assignment is not copied if it is not
|
||||||
* altered.
|
* altered.
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
|
||||||
private void maybeRevokePartitions() {
|
private void maybeRevokePartitions() {
|
||||||
|
int memberCount = groupSpec.memberIds().size();
|
||||||
|
int memberIndex = -1;
|
||||||
for (String memberId : groupSpec.memberIds()) {
|
for (String memberId : groupSpec.memberIds()) {
|
||||||
|
memberIndex++;
|
||||||
|
|
||||||
Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId).partitions();
|
Map<Uuid, Set<Integer>> oldAssignment = groupSpec.memberAssignment(memberId).partitions();
|
||||||
Map<Uuid, Set<Integer>> newAssignment = null;
|
Map<Uuid, Set<Integer>> newAssignment = null;
|
||||||
|
|
||||||
|
@ -164,9 +169,10 @@ public class UniformHomogeneousAssignmentBuilder {
|
||||||
}
|
}
|
||||||
|
|
||||||
int quota = minimumMemberQuota;
|
int quota = minimumMemberQuota;
|
||||||
|
boolean quotaHasExtraPartition = false;
|
||||||
if (remainingMembersToGetAnExtraPartition > 0) {
|
if (remainingMembersToGetAnExtraPartition > 0) {
|
||||||
quota++;
|
quota++;
|
||||||
remainingMembersToGetAnExtraPartition--;
|
quotaHasExtraPartition = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Map.Entry<Uuid, Set<Integer>> topicPartitions : oldAssignment.entrySet()) {
|
for (Map.Entry<Uuid, Set<Integer>> topicPartitions : oldAssignment.entrySet()) {
|
||||||
|
@ -208,10 +214,24 @@ public class UniformHomogeneousAssignmentBuilder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (quota > 0 &&
|
||||||
|
quotaHasExtraPartition &&
|
||||||
|
memberCount - memberIndex > remainingMembersToGetAnExtraPartition) {
|
||||||
|
// Give up the extra partition quota for another member to claim,
|
||||||
|
// unless this member is one of the last remainingMembersToGetAnExtraPartition
|
||||||
|
// members in the list and must take the extra partition.
|
||||||
|
quota--;
|
||||||
|
quotaHasExtraPartition = false;
|
||||||
|
}
|
||||||
|
|
||||||
if (quota > 0) {
|
if (quota > 0) {
|
||||||
unfilledMembers.add(new MemberWithRemainingQuota(memberId, quota));
|
unfilledMembers.add(new MemberWithRemainingQuota(memberId, quota));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (quotaHasExtraPartition) {
|
||||||
|
remainingMembersToGetAnExtraPartition--;
|
||||||
|
}
|
||||||
|
|
||||||
if (newAssignment == null) {
|
if (newAssignment == null) {
|
||||||
targetAssignment.put(memberId, new MemberAssignmentImpl(oldAssignment));
|
targetAssignment.put(memberId, new MemberAssignmentImpl(oldAssignment));
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -161,11 +161,10 @@ public class OptimizedUniformAssignmentBuilderTest {
|
||||||
|
|
||||||
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
|
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
|
||||||
expectedAssignment.put(memberA, mkAssignment(
|
expectedAssignment.put(memberA, mkAssignment(
|
||||||
mkTopicAssignment(topic1Uuid, 0),
|
|
||||||
mkTopicAssignment(topic3Uuid, 0, 1)
|
mkTopicAssignment(topic3Uuid, 0, 1)
|
||||||
));
|
));
|
||||||
expectedAssignment.put(memberB, mkAssignment(
|
expectedAssignment.put(memberB, mkAssignment(
|
||||||
mkTopicAssignment(topic1Uuid, 1, 2)
|
mkTopicAssignment(topic1Uuid, 0, 1, 2)
|
||||||
));
|
));
|
||||||
|
|
||||||
GroupSpec groupSpec = new GroupSpecImpl(
|
GroupSpec groupSpec = new GroupSpecImpl(
|
||||||
|
@ -218,15 +217,15 @@ public class OptimizedUniformAssignmentBuilderTest {
|
||||||
|
|
||||||
// Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
|
// Topic 3 has 2 partitions but three members subscribed to it - one of them should not get an assignment.
|
||||||
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
|
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
|
||||||
expectedAssignment.put(memberA, mkAssignment(
|
expectedAssignment.put(memberA,
|
||||||
mkTopicAssignment(topic3Uuid, 0)
|
|
||||||
));
|
|
||||||
expectedAssignment.put(memberB, mkAssignment(
|
|
||||||
mkTopicAssignment(topic3Uuid, 1)
|
|
||||||
));
|
|
||||||
expectedAssignment.put(memberC,
|
|
||||||
Map.of()
|
Map.of()
|
||||||
);
|
);
|
||||||
|
expectedAssignment.put(memberB, mkAssignment(
|
||||||
|
mkTopicAssignment(topic3Uuid, 0)
|
||||||
|
));
|
||||||
|
expectedAssignment.put(memberC, mkAssignment(
|
||||||
|
mkTopicAssignment(topic3Uuid, 1)
|
||||||
|
));
|
||||||
|
|
||||||
GroupSpec groupSpec = new GroupSpecImpl(
|
GroupSpec groupSpec = new GroupSpecImpl(
|
||||||
members,
|
members,
|
||||||
|
@ -382,11 +381,11 @@ public class OptimizedUniformAssignmentBuilderTest {
|
||||||
|
|
||||||
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
|
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
|
||||||
expectedAssignment.put(memberA, mkAssignment(
|
expectedAssignment.put(memberA, mkAssignment(
|
||||||
mkTopicAssignment(topic1Uuid, 0, 2, 3),
|
mkTopicAssignment(topic1Uuid, 0, 2),
|
||||||
mkTopicAssignment(topic2Uuid, 0, 3, 4)
|
mkTopicAssignment(topic2Uuid, 0, 3, 4)
|
||||||
));
|
));
|
||||||
expectedAssignment.put(memberB, mkAssignment(
|
expectedAssignment.put(memberB, mkAssignment(
|
||||||
mkTopicAssignment(topic1Uuid, 1, 4, 5),
|
mkTopicAssignment(topic1Uuid, 1, 3, 4, 5),
|
||||||
mkTopicAssignment(topic2Uuid, 1, 2)
|
mkTopicAssignment(topic2Uuid, 1, 2)
|
||||||
));
|
));
|
||||||
|
|
||||||
|
@ -603,6 +602,75 @@ public class OptimizedUniformAssignmentBuilderTest {
|
||||||
checkValidityAndBalance(members, computedAssignment);
|
checkValidityAndBalance(members, computedAssignment);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReassignmentStickinessWhenAlreadyBalanced() {
|
||||||
|
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
|
||||||
|
topicMetadata.put(topic1Uuid, new TopicMetadata(
|
||||||
|
topic1Uuid,
|
||||||
|
topic1Name,
|
||||||
|
5
|
||||||
|
));
|
||||||
|
|
||||||
|
// A TreeMap ensures that memberA is first in the iteration order.
|
||||||
|
Map<String, MemberSubscriptionAndAssignmentImpl> members = new TreeMap<>();
|
||||||
|
|
||||||
|
// Two members must have extra partitions. In the previous assignment, they were members A
|
||||||
|
// and C.
|
||||||
|
members.put(memberA, new MemberSubscriptionAndAssignmentImpl(
|
||||||
|
Optional.empty(),
|
||||||
|
Optional.empty(),
|
||||||
|
Set.of(topic1Uuid),
|
||||||
|
new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(topic1Uuid, 0, 3)
|
||||||
|
))
|
||||||
|
));
|
||||||
|
|
||||||
|
members.put(memberB, new MemberSubscriptionAndAssignmentImpl(
|
||||||
|
Optional.empty(),
|
||||||
|
Optional.empty(),
|
||||||
|
Set.of(topic1Uuid, topic2Uuid),
|
||||||
|
new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(topic1Uuid, 1)
|
||||||
|
))
|
||||||
|
));
|
||||||
|
|
||||||
|
members.put(memberC, new MemberSubscriptionAndAssignmentImpl(
|
||||||
|
Optional.empty(),
|
||||||
|
Optional.empty(),
|
||||||
|
Set.of(topic1Uuid, topic2Uuid),
|
||||||
|
new Assignment(mkAssignment(
|
||||||
|
mkTopicAssignment(topic1Uuid, 2, 4)
|
||||||
|
))
|
||||||
|
));
|
||||||
|
|
||||||
|
// Members A and C should keep their partitions.
|
||||||
|
Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>();
|
||||||
|
expectedAssignment.put(memberA, mkAssignment(
|
||||||
|
mkTopicAssignment(topic1Uuid, 0, 3)
|
||||||
|
));
|
||||||
|
expectedAssignment.put(memberB, mkAssignment(
|
||||||
|
mkTopicAssignment(topic1Uuid, 1)
|
||||||
|
));
|
||||||
|
expectedAssignment.put(memberC, mkAssignment(
|
||||||
|
mkTopicAssignment(topic1Uuid, 2, 4)
|
||||||
|
));
|
||||||
|
|
||||||
|
GroupSpec groupSpec = new GroupSpecImpl(
|
||||||
|
members,
|
||||||
|
HOMOGENEOUS,
|
||||||
|
invertedTargetAssignment(members)
|
||||||
|
);
|
||||||
|
SubscribedTopicDescriberImpl subscribedTopicMetadata = new SubscribedTopicDescriberImpl(topicMetadata);
|
||||||
|
|
||||||
|
GroupAssignment computedAssignment = assignor.assign(
|
||||||
|
groupSpec,
|
||||||
|
subscribedTopicMetadata
|
||||||
|
);
|
||||||
|
|
||||||
|
assertAssignment(expectedAssignment, computedAssignment);
|
||||||
|
checkValidityAndBalance(members, computedAssignment);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verifies that the given assignment is valid with respect to the given subscriptions.
|
* Verifies that the given assignment is valid with respect to the given subscriptions.
|
||||||
* Validity requirements:
|
* Validity requirements:
|
||||||
|
|
Loading…
Reference in New Issue