KAFKA-12464: minor code cleanup and additional logging in constrained sticky assignment (#10645)

This is the follow up PR to address the remaining comments in #10509.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Luke Chen 2021-05-09 09:11:40 +08:00 committed by GitHub
parent 8f8f914efc
commit f1ef21f70a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 33 additions and 20 deletions

View File

@ -178,17 +178,17 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers);
int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers);
// the expected number of members with maxQuota assignment
int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers;
// the number of members with exactly maxQuota partitions assigned
int numMembersHavingMorePartitions = 0;
// the expected number of members with over minQuota assignment
int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount % numberOfConsumers;
// the number of members with over minQuota partitions assigned
int numMembersAssignedOverMinQuota = 0;
// initialize the assignment map with an empty array of size maxQuota for all members
Map<String, List<TopicPartition>> assignment = new HashMap<>(
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota))));
List<TopicPartition> assignedPartitions = new ArrayList<>();
// Reassign as many previously owned partitions as possible
// Reassign previously owned partitions to the expected number
for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
String consumer = consumerEntry.getKey();
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
@ -203,10 +203,10 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
assignedPartitions.addAll(ownedPartitions);
}
unfilledMembers.add(consumer);
} else if (ownedPartitions.size() >= maxQuota && numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
// consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected max capacity members
// so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
numMembersHavingMorePartitions++;
} else if (ownedPartitions.size() >= maxQuota && numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {
// consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected members
// with more than the minQuota partitions, so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
numMembersAssignedOverMinQuota++;
List<TopicPartition> maxQuotaPartitions = ownedPartitions.subList(0, maxQuota);
consumerAssignment.addAll(maxQuotaPartitions);
assignedPartitions.addAll(maxQuotaPartitions);
@ -218,8 +218,10 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
consumerAssignment.addAll(minQuotaPartitions);
assignedPartitions.addAll(minQuotaPartitions);
allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
// this consumer is potential maxQuota candidate since we're still under the number of expected max capacity members
if (numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
// this consumer is potential maxQuota candidate since we're still under the number of expected members
// with more than the minQuota partitions. Note, if the number of expected members with more than
// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers
if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {
unfilledMembers.add(consumer);
}
}
@ -242,6 +244,9 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
if (unfilledMembers.isEmpty()) {
// Should not enter here since we have calculated the exact number to assign to each consumer
// There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners.
int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition);
log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}",
unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size()));
throw new IllegalStateException("No more unfilled consumers to be assigned.");
}
unfilledConsumerIter = unfilledMembers.iterator();
@ -255,27 +260,35 @@ public abstract class AbstractStickyAssignor extends AbstractPartitionAssignor {
partitionsTransferringOwnership.put(unassignedPartition, consumer);
int currentAssignedCount = consumerAssignment.size();
int expectedAssignedCount = numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ? maxQuota : minQuota;
int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
if (currentAssignedCount == expectedAssignedCount) {
if (currentAssignedCount == maxQuota) {
numMembersHavingMorePartitions++;
numMembersAssignedOverMinQuota++;
}
unfilledConsumerIter.remove();
}
}
if (!unfilledMembers.isEmpty()) {
// we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number
// of max capacity members. Otherwise, there must be error here.
if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions) {
throw new IllegalStateException(String.format("We haven't reached the allowed number of max capacity members, " +
"but no more partitions to be assigned to unfilled consumers: %s", unfilledMembers));
// we expected all the remaining unfilled members have minQuota partitions and we're already at the expected number
// of members with more than the minQuota partitions. Otherwise, there must be error here.
if (numMembersAssignedOverMinQuota != expectedNumMembersAssignedOverMinQuota) {
log.error("Current number of members with more than the minQuota partitions: {}, is less than the expected number " +
"of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}",
numMembersAssignedOverMinQuota, expectedNumMembersAssignedOverMinQuota, unfilledMembers);
throw new IllegalStateException("We haven't reached the expected number of members with " +
"more than the minQuota partitions, but no more partitions to be assigned");
} else {
for (String unfilledMember : unfilledMembers) {
int assignedPartitionsCount = assignment.get(unfilledMember).size();
if (assignedPartitionsCount != minQuota) {
throw new IllegalStateException(String.format("Consumer: [%s] should have %d partitions, but got %d partitions, " +
"and no more partitions to be assigned", unfilledMember, minQuota, assignedPartitionsCount));
log.error("Consumer: [{}] should have {} partitions, but got {} partitions, and no more partitions " +
"to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers);
throw new IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota partitions, " +
"and no more partitions to be assigned", unfilledMember));
} else {
log.trace("skip over this unfilled member: [{}] because we've reached the expected number of " +
"members with more than the minQuota partitions, and this member already have minQuota partitions", unfilledMember);
}
}
}