KAFKA-19370: Create JMH benchmark for share group assignor (#19907)

As part of readying share groups for production, we want to ensure that
the performance of the server-side assignor is optimal. In common with
consumer group assignors, a JMH benchmark is used for the analysis.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Andrew Schofield 2025-06-06 08:29:18 +01:00 committed by GitHub
parent a090dc3ba5
commit 5cf8b2abb0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 311 additions and 6 deletions

View File

@ -26,8 +26,10 @@ import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
@ -113,24 +115,24 @@ public class AssignorBenchmarkUtils {
}
/**
* Creates a GroupSpec from the given ConsumerGroupMembers.
* Creates a GroupSpec from the given ModernGroupMembers.
*
* @param members The ConsumerGroupMembers.
* @param subscriptionType  The group's subscription type.
* @param members The ModernGroupMembers.
* @param subscriptionType The group's subscription type.
* @param topicResolver The TopicResolver to use.
* @return The new GroupSpec.
*/
public static GroupSpec createGroupSpec(
Map<String, ConsumerGroupMember> members,
Map<String, ? extends ModernGroupMember> members,
SubscriptionType subscriptionType,
TopicIds.TopicResolver topicResolver
) {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();
// Prepare the member spec for all members.
for (Map.Entry<String, ConsumerGroupMember> memberEntry : members.entrySet()) {
for (Map.Entry<String, ? extends ModernGroupMember> memberEntry : members.entrySet()) {
String memberId = memberEntry.getKey();
ConsumerGroupMember member = memberEntry.getValue();
ModernGroupMember member = memberEntry.getValue();
memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
@ -237,6 +239,88 @@ public class AssignorBenchmarkUtils {
return members;
}
/**
* Creates a ShareGroupMembers map where all members have the same topic subscriptions.
*
* @param memberCount The number of members in the group.
* @param getMemberId A function to map member indices to member ids.
* @param topicNames The topics to subscribe to.
* @return The new ShareGroupMembers map.
*/
public static Map<String, ShareGroupMember> createHomogeneousShareGroupMembers(
int memberCount,
Function<Integer, String> getMemberId,
List<String> topicNames
) {
Map<String, ShareGroupMember> members = new HashMap<>();
for (int i = 0; i < memberCount; i++) {
String memberId = getMemberId.apply(i);
members.put(memberId, new ShareGroupMember.Builder("member" + i)
.setSubscribedTopicNames(topicNames)
.build()
);
}
return members;
}
/**
* Creates a ShareGroupMembers map where members have different topic subscriptions.
*
* Divides members and topics into a given number of buckets. Within each bucket, members are
* subscribed to the same topics.
*
* @param memberCount The number of members in the group.
* @param bucketCount The number of buckets.
* @param getMemberId A function to map member indices to member ids.
* @param topicNames The topics to subscribe to.
* @return The new ShareGroupMembers map.
*/
public static Map<String, ShareGroupMember> createHeterogeneousBucketedShareGroupMembers(
int memberCount,
int bucketCount,
Function<Integer, String> getMemberId,
List<String> topicNames
) {
Map<String, ShareGroupMember> members = new HashMap<>();
// Adjust bucket count based on member count when member count < max bucket count.
bucketCount = Math.min(bucketCount, memberCount);
// Check minimum topics requirement
if (topicNames.size() < bucketCount) {
throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing.");
}
int bucketSizeTopics = (int) Math.ceil((double) topicNames.size() / bucketCount);
int bucketSizeMembers = (int) Math.ceil((double) memberCount / bucketCount);
// Define buckets for each member and assign topics from the same bucket
for (int bucket = 0; bucket < bucketCount; bucket++) {
int memberStartIndex = bucket * bucketSizeMembers;
int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, memberCount);
int topicStartIndex = bucket * bucketSizeTopics;
int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicNames.size());
List<String> bucketTopicNames = topicNames.subList(topicStartIndex, topicEndIndex);
// Assign topics to each member in the current bucket
for (int i = memberStartIndex; i < memberEndIndex; i++) {
String memberId = getMemberId.apply(i);
members.put(memberId, new ShareGroupMember.Builder("member" + i)
.setSubscribedTopicNames(bucketTopicNames)
.build()
);
}
}
return members;
}
public static void addTopic(
MetadataDelta delta,
Uuid topicId,

View File

@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.image.MetadataImage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HETEROGENEOUS;
import static org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class ShareGroupAssignorBenchmark {
public enum AssignorType {
SIMPLE(new SimpleAssignor());
private final PartitionAssignor assignor;
AssignorType(PartitionAssignor assignor) {
this.assignor = assignor;
}
public PartitionAssignor assignor() {
return assignor;
}
}
/**
* The assignment type is decided based on whether all the members are assigned partitions
* for the first time (full), or incrementally when a rebalance is triggered.
*/
public enum AssignmentType {
FULL, INCREMENTAL
}
@Param({"10", "100", "1000"})
private int memberCount;
@Param({"1", "10", "100"})
private int partitionCount;
@Param({"10", "100"})
private int topicCount;
@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
private SubscriptionType subscriptionType;
@Param({"SIMPLE"})
private AssignorType assignorType;
@Param({"FULL", "INCREMENTAL"})
private AssignmentType assignmentType;
private PartitionAssignor partitionAssignor;
/** The number of homogeneous subgroups to create for the heterogeneous subscription case. */
private static final int MAX_BUCKET_COUNT = 5;
private GroupSpec groupSpec;
private List<String> allTopicNames = Collections.emptyList();
private TopicIds.TopicResolver topicResolver;
private SubscribedTopicDescriber subscribedTopicDescriber;
@Setup(Level.Trial)
public void setup() {
partitionAssignor = assignorType.assignor();
setupTopics();
Map<String, ShareGroupMember> members = createMembers();
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicResolver);
if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance();
}
}
private void setupTopics() {
allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount);
MetadataImage metadataImage = AssignorBenchmarkUtils.createMetadataImage(allTopicNames, partitionCount);
topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics());
subscribedTopicDescriber = new SubscribedTopicDescriberImpl(metadataImage);
}
private Map<String, ShareGroupMember> createMembers() {
// In the incremental rebalance case, we will add the last member as a trigger.
// This is done to keep the total members count consistent with the input.
int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount;
if (subscriptionType == HOMOGENEOUS) {
return AssignorBenchmarkUtils.createHomogeneousShareGroupMembers(
numberOfMembers,
this::memberId,
allTopicNames
);
} else {
return AssignorBenchmarkUtils.createHeterogeneousBucketedShareGroupMembers(
numberOfMembers,
MAX_BUCKET_COUNT,
this::memberId,
allTopicNames
);
}
}
private String memberId(int memberIndex) {
return "member" + memberIndex;
}
private void simulateIncrementalRebalance() {
GroupAssignment initialAssignment = partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
Map<String, MemberAssignment> members = initialAssignment.members();
Map<Uuid, Map<Integer, String>> invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(initialAssignment);
Map<String, MemberSubscriptionAndAssignmentImpl> updatedMemberSpec = new HashMap<>();
for (String memberId : groupSpec.memberIds()) {
MemberAssignment memberAssignment = members.getOrDefault(
memberId,
new MemberAssignmentImpl(Collections.emptyMap())
);
updatedMemberSpec.put(memberId, new MemberSubscriptionAndAssignmentImpl(
groupSpec.memberSubscription(memberId).rackId(),
Optional.empty(),
groupSpec.memberSubscription(memberId).subscribedTopicIds(),
new Assignment(Collections.unmodifiableMap(memberAssignment.partitions()))
));
}
Set<Uuid> subscribedTopicIdsForNewMember;
if (subscriptionType == HETEROGENEOUS) {
subscribedTopicIdsForNewMember = updatedMemberSpec.get(memberId(memberCount - MAX_BUCKET_COUNT)).subscribedTopicIds();
} else {
subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicResolver);
}
updatedMemberSpec.put("newMember", new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
subscribedTopicIdsForNewMember,
Assignment.EMPTY
));
groupSpec = new GroupSpecImpl(
updatedMemberSpec,
subscriptionType,
invertedTargetAssignment
);
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void doAssignment() {
topicResolver.clear();
partitionAssignor.assign(groupSpec, subscribedTopicDescriber);
}
}