MINOR: Rename uniform assignor's internal builders (#16233)

This patch renames the uniform assignor's builders to match the `SubscriptionType` which is used to determine which one is called. It removes the abstract class `AbstractUniformAssignmentBuilder` which is not necessary anymore. It also applies minor refactoring.

Reviewers: Ritika Reddy <rreddy@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
David Jacot 2024-06-10 14:26:56 +02:00
parent db3bf4ae3d
commit a2760e0131
6 changed files with 57 additions and 92 deletions

View File

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.server.common.TopicIdPartition;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* The assignment builder is used to construct the target assignment based on the members' subscriptions.
*/
public abstract class AbstractUniformAssignmentBuilder {
protected abstract GroupAssignment buildAssignment();
/**
* Adds the topic's partition to the member's target assignment.
*/
protected static void addPartitionToAssignment(
Map<String, MemberAssignment> memberAssignments,
String memberId,
Uuid topicId,
int partition
) {
memberAssignments.get(memberId)
.partitions()
.computeIfAbsent(topicId, __ -> new HashSet<>())
.add(partition);
}
/**
* Constructs a set of {@code TopicIdPartition} including all the given topic Ids based on their partition counts.
*
* @param topicIds Collection of topic Ids.
* @param subscribedTopicDescriber Describer to fetch partition counts for topics.
*
* @return Set of {@code TopicIdPartition} including all the provided topic Ids.
*/
protected static Set<TopicIdPartition> topicIdPartitions(
Collection<Uuid> topicIds,
SubscribedTopicDescriber subscribedTopicDescriber
) {
return topicIds.stream()
.flatMap(topic -> IntStream
.range(0, subscribedTopicDescriber.numPartitions(topic))
.mapToObj(i -> new TopicIdPartition(topic, i))
).collect(Collectors.toSet());
}
}

View File

@ -35,19 +35,19 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
* subscriptions across the group members: * subscriptions across the group members:
* <ul> * <ul>
* <li> * <li>
* <b> Optimized Uniform Assignment Builder: </b> This strategy is used when all members have subscribed * <b> Uniform Homogeneous Assignment Builder: </b> This strategy is used when all members have subscribed
* to the same set of topics. * to the same set of topics.
* </li> * </li>
* <li> * <li>
* <b> General Uniform Assignment Builder: </b> This strategy is used when members have varied topic * <b> Uniform Heterogeneous Assignment Builder: </b> This strategy is used when members have varied topic
* subscriptions. * subscriptions.
* </li> * </li>
* </ul> * </ul>
* *
* The appropriate strategy is automatically chosen based on the current members' topic subscriptions. * The appropriate strategy is automatically chosen based on the current members' topic subscriptions.
* *
* @see OptimizedUniformAssignmentBuilder * @see UniformHomogeneousAssignmentBuilder
* @see GeneralUniformAssignmentBuilder * @see UniformHeterogeneousAssignmentBuilder
*/ */
public class UniformAssignor implements ConsumerGroupPartitionAssignor { public class UniformAssignor implements ConsumerGroupPartitionAssignor {
private static final Logger LOG = LoggerFactory.getLogger(UniformAssignor.class); private static final Logger LOG = LoggerFactory.getLogger(UniformAssignor.class);
@ -76,14 +76,14 @@ public class UniformAssignor implements ConsumerGroupPartitionAssignor {
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the " LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the "
+ "optimized assignment algorithm"); + "homogeneous assignment algorithm");
return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber) return new UniformHomogeneousAssignmentBuilder(groupSpec, subscribedTopicDescriber)
.build(); .build();
} else { } else {
LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the " LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the "
+ "general assignment algorithm"); + "heterogeneous assignment algorithm");
return new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber) return new UniformHeterogeneousAssignmentBuilder(groupSpec, subscribedTopicDescriber)
.buildAssignment(); .build();
} }
} }
} }

View File

@ -41,7 +41,7 @@ import java.util.TreeSet;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* The general uniform assignment builder is used to generate the target assignment for a consumer group with * The heterogeneous uniform assignment builder is used to generate the target assignment for a consumer group with
* at least one of its members subscribed to a different set of topics. * at least one of its members subscribed to a different set of topics.
* *
* Assignments are done according to the following principles: * Assignments are done according to the following principles:
@ -55,8 +55,8 @@ import java.util.stream.Collectors;
* This assignment builder prioritizes the above properties in the following order: * This assignment builder prioritizes the above properties in the following order:
* Balance > Stickiness. * Balance > Stickiness.
*/ */
public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { public class UniformHeterogeneousAssignmentBuilder {
private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); private static final Logger LOG = LoggerFactory.getLogger(UniformHeterogeneousAssignmentBuilder.class);
/** /**
* The group metadata specification. * The group metadata specification.
@ -113,7 +113,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
*/ */
private final PartitionMovements partitionMovements; private final PartitionMovements partitionMovements;
public GeneralUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.groupSpec = groupSpec; this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber; this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>(); this.subscribedTopicIds = new HashSet<>();
@ -133,7 +133,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
targetAssignment.put(memberId, new MemberAssignmentImpl(new HashMap<>())); targetAssignment.put(memberId, new MemberAssignmentImpl(new HashMap<>()));
}) })
); );
this.unassignedPartitions = new HashSet<>(topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber)); this.unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber);
this.assignedStickyPartitions = new HashSet<>(); this.assignedStickyPartitions = new HashSet<>();
this.assignmentManager = new AssignmentManager(this.subscribedTopicDescriber); this.assignmentManager = new AssignmentManager(this.subscribedTopicDescriber);
this.sortedMembersByAssignmentSize = assignmentManager.sortMembersByAssignmentSize(groupSpec.memberIds()); this.sortedMembersByAssignmentSize = assignmentManager.sortMembersByAssignmentSize(groupSpec.memberIds());
@ -148,8 +148,7 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
* <li> Allocate all the remaining unassigned partitions to the members in a balanced manner.</li> * <li> Allocate all the remaining unassigned partitions to the members in a balanced manner.</li>
* <li> Iterate through the assignment until it is balanced. </li> * <li> Iterate through the assignment until it is balanced. </li>
*/ */
@Override public GroupAssignment build() {
protected GroupAssignment buildAssignment() {
if (subscribedTopicIds.isEmpty()) { if (subscribedTopicIds.isEmpty()) {
LOG.info("The subscription list is empty, returning an empty assignment"); LOG.info("The subscription list is empty, returning an empty assignment");
return new GroupAssignment(Collections.emptyMap()); return new GroupAssignment(Collections.emptyMap());
@ -462,6 +461,43 @@ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBu
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, newMember); assignmentManager.addPartitionToTargetAssignment(topicIdPartition, newMember);
} }
/**
* Adds the topic's partition to the member's target assignment.
*/
private static void addPartitionToAssignment(
Map<String, MemberAssignment> memberAssignments,
String memberId,
Uuid topicId,
int partition
) {
memberAssignments.get(memberId)
.partitions()
.computeIfAbsent(topicId, __ -> new HashSet<>())
.add(partition);
}
/**
* Constructs a set of {@code TopicIdPartition} including all the given topic Ids based on their partition counts.
*
* @param topicIds Collection of topic Ids.
* @param subscribedTopicDescriber Describer to fetch partition counts for topics.
*
* @return Set of {@code TopicIdPartition} including all the provided topic Ids.
*/
private static Set<TopicIdPartition> topicIdPartitions(
Collection<Uuid> topicIds,
SubscribedTopicDescriber subscribedTopicDescriber
) {
Set<TopicIdPartition> topicIdPartitions = new HashSet<>();
for (Uuid topicId : topicIds) {
int numPartitions = subscribedTopicDescriber.numPartitions(topicId);
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
topicIdPartitions.add(new TopicIdPartition(topicId, partitionId));
}
}
return topicIdPartitions;
}
/** /**
* This class represents a pair of member Ids involved in a partition reassignment. * This class represents a pair of member Ids involved in a partition reassignment.
* Each pair contains a source and a destination member Id. * Each pair contains a source and a destination member Id.

View File

@ -34,7 +34,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
/** /**
* The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with * The homogeneous uniform assignment builder is used to generate the target assignment for a consumer group with
* all its members subscribed to the same set of topics. * all its members subscribed to the same set of topics.
* *
* Assignments are done according to the following principles: * Assignments are done according to the following principles:
@ -48,7 +48,7 @@ import java.util.Set;
* The assignment builder prioritizes the properties in the following order: * The assignment builder prioritizes the properties in the following order:
* Balance > Stickiness. * Balance > Stickiness.
*/ */
public class OptimizedUniformAssignmentBuilder { public class UniformHomogeneousAssignmentBuilder {
private static final Class<?> UNMODIFIABLE_MAP_CLASS = Collections.unmodifiableMap(new HashMap<>()).getClass(); private static final Class<?> UNMODIFIABLE_MAP_CLASS = Collections.unmodifiableMap(new HashMap<>()).getClass();
private static final Class<?> EMPTY_MAP_CLASS = Collections.emptyMap().getClass(); private static final Class<?> EMPTY_MAP_CLASS = Collections.emptyMap().getClass();
@ -104,7 +104,7 @@ public class OptimizedUniformAssignmentBuilder {
*/ */
private int remainingMembersToGetAnExtraPartition; private int remainingMembersToGetAnExtraPartition;
OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { UniformHomogeneousAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.groupSpec = groupSpec; this.groupSpec = groupSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber; this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscribedTopicIds = new HashSet<>(groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) this.subscribedTopicIds = new HashSet<>(groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())

View File

@ -44,7 +44,7 @@ import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
public class GeneralUniformAssignmentBuilderTest { public class UniformHeterogeneousAssignmentBuilderTest {
private final UniformAssignor assignor = new UniformAssignor(); private final UniformAssignor assignor = new UniformAssignor();
private final Uuid topic1Uuid = Uuid.fromString("T1-A4s3VTwiI5CTbEp6POw"); private final Uuid topic1Uuid = Uuid.fromString("T1-A4s3VTwiI5CTbEp6POw");
private final Uuid topic2Uuid = Uuid.fromString("T2-B4s3VTwiI5YHbPp6YUe"); private final Uuid topic2Uuid = Uuid.fromString("T2-B4s3VTwiI5YHbPp6YUe");

View File

@ -76,7 +76,7 @@ public class ClientSideAssignorBenchmark {
/** /**
* The subscription pattern followed by the members of the group. * The subscription pattern followed by the members of the group.
* *
* A subscription model is considered homogenous if all the members of the group * A subscription model is considered homogeneous if all the members of the group
* are subscribed to the same set of topics, it is heterogeneous otherwise. * are subscribed to the same set of topics, it is heterogeneous otherwise.
*/ */
public enum SubscriptionModel { public enum SubscriptionModel {