diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java index 4bfcd727852..b9ce9bab9af 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java @@ -20,13 +20,30 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.GroupSpecImpl; +import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl; +import org.apache.kafka.coordinator.group.modern.TopicIds; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.TopicsImage; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.function.Function; + public class AssignorBenchmarkUtils { /** @@ -57,6 +74,215 @@ public class AssignorBenchmarkUtils { return invertedTargetAssignment; } + /** + * Generates a list of topic names for use in benchmarks. + * + * @param topicCount The number of topic names to generate. + * @return The list of topic names. + */ + public static List createTopicNames(int topicCount) { + List topicNames = new ArrayList<>(); + for (int i = 0; i < topicCount; i++) { + topicNames.add("topic-" + i); + } + return topicNames; + } + + /** + * Creates a subscription metadata map for the given topics. + * + * @param topicNames The names of the topics. + * @param partitionsPerTopic The number of partitions per topic. + * @param getTopicPartitionRacks A function to get the racks map for each topic. May return + * an empty map if no rack info is desired. + * @return The subscription metadata map. + */ + public static Map createSubscriptionMetadata( + List topicNames, + int partitionsPerTopic, + Function>> getTopicPartitionRacks + ) { + Map subscriptionMetadata = new HashMap<>(); + + for (String topicName : topicNames) { + Uuid topicId = Uuid.randomUuid(); + + TopicMetadata metadata = new TopicMetadata( + topicId, + topicName, + partitionsPerTopic, + getTopicPartitionRacks.apply(topicName) + ); + subscriptionMetadata.put(topicName, metadata); + } + + return subscriptionMetadata; + } + + /** + * Creates a topic metadata map from the given subscription metadata. + * + * @param subscriptionMetadata The subscription metadata. + * @return The topic metadata map. + */ + public static Map createTopicMetadata( + Map subscriptionMetadata + ) { + Map topicMetadata = new HashMap<>((int) (subscriptionMetadata.size() / 0.75f + 1)); + for (Map.Entry entry : subscriptionMetadata.entrySet()) { + topicMetadata.put(entry.getValue().id(), entry.getValue()); + } + return topicMetadata; + } + + /** + * Creates a TopicsImage from the given subscription metadata. + * + * @param subscriptionMetadata The subscription metadata. + * @return A TopicsImage containing the topic ids, names and partition counts from the + * subscription metadata. + */ + public static TopicsImage createTopicsImage(Map subscriptionMetadata) { + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + + for (Map.Entry entry : subscriptionMetadata.entrySet()) { + TopicMetadata topicMetadata = entry.getValue(); + AssignorBenchmarkUtils.addTopic( + delta, + topicMetadata.id(), + topicMetadata.name(), + topicMetadata.numPartitions() + ); + } + + return delta.apply(MetadataProvenance.EMPTY).topics(); + } + + /** + * Creates a GroupSpec from the given ConsumerGroupMembers. + * + * @param members The ConsumerGroupMembers. + * @param subscriptionType  The group's subscription type. + * @param topicsImage The TopicsImage to use. + * @return The new GroupSpec. + */ + public static GroupSpec createGroupSpec( + Map members, + SubscriptionType subscriptionType, + TopicsImage topicsImage + ) { + Map memberSpecs = new HashMap<>(); + + // Prepare the member spec for all members. + for (Map.Entry memberEntry : members.entrySet()) { + String memberId = memberEntry.getKey(); + ConsumerGroupMember member = memberEntry.getValue(); + + memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl( + Optional.ofNullable(member.rackId()), + Optional.ofNullable(member.instanceId()), + new TopicIds(member.subscribedTopicNames(), topicsImage), + new Assignment(member.assignedPartitions()) + )); + } + + return new GroupSpecImpl( + memberSpecs, + subscriptionType, + Collections.emptyMap() + ); + } + + /** + * Creates a ConsumerGroupMembers map where all members have the same topic subscriptions. + * + * @param memberCount The number of members in the group. + * @param getMemberId A function to map member indices to member ids. + * @param getMemberRackId A function to map member indices to rack ids. + * @param topicNames The topics to subscribe to. + * @return The new ConsumerGroupMembers map. + */ + public static Map createHomogeneousMembers( + int memberCount, + Function getMemberId, + Function> getMemberRackId, + List topicNames + ) { + Map members = new HashMap<>(); + + for (int i = 0; i < memberCount; i++) { + String memberId = getMemberId.apply(i); + Optional rackId = getMemberRackId.apply(i); + + members.put(memberId, new ConsumerGroupMember.Builder("member" + i) + .setRackId(rackId.orElse(null)) + .setSubscribedTopicNames(topicNames) + .build() + ); + } + + return members; + } + + /** + * Creates a ConsumerGroupMembers map where members have different topic subscriptions. + * + * Divides members and topics into a given number of buckets. Within each bucket, members are + * subscribed to the same topics. + * + * @param memberCount The number of members in the group. + * @param bucketCount The number of buckets. + * @param getMemberId A function to map member indices to member ids. + * @param getMemberRackId A function to map member indices to rack ids. + * @param topicNames The topics to subscribe to. + * @return The new ConsumerGroupMembers map. + */ + public static Map createHeterogeneousBucketedMembers( + int memberCount, + int bucketCount, + Function getMemberId, + Function> getMemberRackId, + List topicNames + ) { + Map members = new HashMap<>(); + + // Adjust bucket count based on member count when member count < max bucket count. + bucketCount = Math.min(bucketCount, memberCount); + + // Check minimum topics requirement + if (topicNames.size() < bucketCount) { + throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing."); + } + + int bucketSizeTopics = (int) Math.ceil((double) topicNames.size() / bucketCount); + int bucketSizeMembers = (int) Math.ceil((double) memberCount / bucketCount); + + // Define buckets for each member and assign topics from the same bucket + for (int bucket = 0; bucket < bucketCount; bucket++) { + int memberStartIndex = bucket * bucketSizeMembers; + int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, memberCount); + + int topicStartIndex = bucket * bucketSizeTopics; + int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicNames.size()); + + List bucketTopicNames = topicNames.subList(topicStartIndex, topicEndIndex); + + // Assign topics to each member in the current bucket + for (int i = memberStartIndex; i < memberEndIndex; i++) { + String memberId = getMemberId.apply(i); + Optional rackId = getMemberRackId.apply(i); + + members.put(memberId, new ConsumerGroupMember.Builder("member" + i) + .setRackId(rackId.orElse(null)) + .setSubscribedTopicNames(bucketTopicNames) + .build() + ); + } + } + + return members; + } + public static void addTopic( MetadataDelta delta, Uuid topicId, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java index cbd074c8896..82f11350381 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java @@ -18,6 +18,7 @@ package org.apache.kafka.jmh.assignor; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; @@ -31,9 +32,7 @@ import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignment import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.modern.TopicIds; import org.apache.kafka.coordinator.group.modern.TopicMetadata; -import org.apache.kafka.image.MetadataDelta; -import org.apache.kafka.image.MetadataImage; -import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.image.TopicsImage; import org.openjdk.jmh.annotations.Benchmark; @@ -50,7 +49,6 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -122,127 +120,80 @@ public class ServerSideAssignorBenchmark { private static final int MAX_BUCKET_COUNT = 5; - private GroupSpecImpl groupSpec; + private GroupSpec groupSpec; - private SubscribedTopicDescriber subscribedTopicDescriber; + private List allTopicNames = Collections.emptyList(); - private final List allTopicNames = new ArrayList<>(); + private Map subscriptionMetadata = Collections.emptyMap(); private TopicsImage topicsImage = TopicsImage.EMPTY; + private SubscribedTopicDescriber subscribedTopicDescriber; + @Setup(Level.Trial) public void setup() { - Map topicMetadata = createTopicMetadata(); - subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata); - - createGroupSpec(); - partitionAssignor = assignorType.assignor(); + setupTopics(); + + Map members = createMembers(); + this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicsImage); + if (assignmentType == AssignmentType.INCREMENTAL) { simulateIncrementalRebalance(); } } - private Map createTopicMetadata() { - MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); - Map topicMetadata = new HashMap<>(); - int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount; + private void setupTopics() { + allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount); + int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount; Map> partitionRacks = isRackAware ? - mkMapOfPartitionRacks(partitionsPerTopicCount) : + mkMapOfPartitionRacks(partitionsPerTopic) : Collections.emptyMap(); + subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata( + allTopicNames, + partitionsPerTopic, + topicName -> partitionRacks + ); - for (int i = 0; i < topicCount; i++) { - Uuid topicUuid = Uuid.randomUuid(); - String topicName = "topic" + i; - allTopicNames.add(topicName); - topicMetadata.put(topicUuid, new TopicMetadata( - topicUuid, - topicName, - partitionsPerTopicCount, - partitionRacks - )); + topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata); - AssignorBenchmarkUtils.addTopic( - delta, - topicUuid, - topicName, - partitionsPerTopicCount - ); - } - - topicsImage = delta.apply(MetadataProvenance.EMPTY).topics(); - return topicMetadata; + Map topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata); + subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata); } - private void createGroupSpec() { - Map members = new HashMap<>(); - + private Map createMembers() { // In the rebalance case, we will add the last member as a trigger. // This is done to keep the total members count consistent with the input. int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount; if (subscriptionType == HOMOGENEOUS) { - for (int i = 0; i < numberOfMembers; i++) { - addMemberSpec(members, i, new TopicIds(new HashSet<>(allTopicNames), topicsImage)); - } + return AssignorBenchmarkUtils.createHomogeneousMembers( + numberOfMembers, + this::memberId, + this::rackId, + allTopicNames + ); } else { - // Adjust bucket count based on member count when member count < max bucket count. - int bucketCount = Math.min(MAX_BUCKET_COUNT, numberOfMembers); - - // Check minimum topics requirement - if (topicCount < bucketCount) { - throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing."); - } - - int bucketSizeTopics = (int) Math.ceil((double) topicCount / bucketCount); - int bucketSizeMembers = (int) Math.ceil((double) numberOfMembers / bucketCount); - - // Define buckets for each member and assign topics from the same bucket - for (int bucket = 0; bucket < bucketCount; bucket++) { - int memberStartIndex = bucket * bucketSizeMembers; - int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, numberOfMembers); - - int topicStartIndex = bucket * bucketSizeTopics; - int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicCount); - - TopicIds bucketTopics = new TopicIds(new HashSet<>(allTopicNames.subList(topicStartIndex, topicEndIndex)), topicsImage); - - // Assign topics to each member in the current bucket - for (int i = memberStartIndex; i < memberEndIndex; i++) { - addMemberSpec(members, i, bucketTopics); - } - } + return AssignorBenchmarkUtils.createHeterogeneousBucketedMembers( + numberOfMembers, + MAX_BUCKET_COUNT, + this::memberId, + this::rackId, + allTopicNames + ); } + } - this.groupSpec = new GroupSpecImpl( - members, - subscriptionType, - Collections.emptyMap() - ); + private String memberId(int memberIndex) { + return "member" + memberIndex; } private Optional rackId(int memberIndex) { return isRackAware ? Optional.of("rack" + memberIndex % NUMBER_OF_RACKS) : Optional.empty(); } - private void addMemberSpec( - Map members, - int memberIndex, - Set subscribedTopicIds - ) { - String memberId = "member" + memberIndex; - Optional rackId = rackId(memberIndex); - - members.put(memberId, new MemberSubscriptionAndAssignmentImpl( - rackId, - Optional.empty(), - subscribedTopicIds, - Assignment.EMPTY - )); - } - private static Map> mkMapOfPartitionRacks(int numPartitions) { Map> partitionRacks = new HashMap<>(numPartitions); for (int i = 0; i < numPartitions; i++) { @@ -279,7 +230,7 @@ public class ServerSideAssignorBenchmark { Set subscribedTopicIdsForNewMember; if (subscriptionType == HETEROGENEOUS) { - subscribedTopicIdsForNewMember = updatedMemberSpec.get("member" + (memberCount - 2)).subscribedTopicIds(); + subscribedTopicIdsForNewMember = updatedMemberSpec.get(memberId(memberCount - 2)).subscribedTopicIds(); } else { subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicsImage); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java index 4734036d693..adf40671ebc 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java @@ -18,20 +18,16 @@ package org.apache.kafka.jmh.assignor; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import org.apache.kafka.coordinator.group.modern.Assignment; -import org.apache.kafka.coordinator.group.modern.GroupSpecImpl; -import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl; import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder; -import org.apache.kafka.coordinator.group.modern.TopicIds; import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; -import org.apache.kafka.image.MetadataDelta; -import org.apache.kafka.image.MetadataImage; -import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.TopicsImage; import org.openjdk.jmh.annotations.Benchmark; @@ -48,10 +44,8 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -83,27 +77,30 @@ public class TargetAssignmentBuilderBenchmark { private PartitionAssignor partitionAssignor; - private Map subscriptionMetadata = Collections.emptyMap(); - private TargetAssignmentBuilder targetAssignmentBuilder; - private GroupSpecImpl groupSpec; + private GroupSpec groupSpec; private Map> invertedTargetAssignment; - private final List allTopicNames = new ArrayList<>(); + private List allTopicNames = Collections.emptyList(); + + private Map subscriptionMetadata = Collections.emptyMap(); private TopicsImage topicsImage; + private SubscribedTopicDescriber subscribedTopicDescriber; + @Setup(Level.Trial) public void setup() { // For this benchmark we will use the Uniform Assignor // and a group that has a homogeneous subscription model. partitionAssignor = new UniformAssignor(); - subscriptionMetadata = generateMockSubscriptionMetadata(); - Map members = generateMockMembers(); - Map existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(); + setupTopics(); + + Map members = createMembers(); + Map existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(members); ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember") .setSubscribedTopicNames(allTopicNames) @@ -119,62 +116,34 @@ public class TargetAssignmentBuilderBenchmark { .addOrUpdateMember(newMember.memberId(), newMember); } - private Map generateMockMembers() { - Map members = new HashMap<>(); + private void setupTopics() { + allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount); - for (int i = 0; i < memberCount - 1; i++) { - ConsumerGroupMember member = new ConsumerGroupMember.Builder("member" + i) - .setSubscribedTopicNames(allTopicNames) - .build(); - members.put("member" + i, member); - } - return members; - } - - private Map generateMockSubscriptionMetadata() { - Map subscriptionMetadata = new HashMap<>(); - MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); - int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount; - - for (int i = 0; i < topicCount; i++) { - String topicName = "topic-" + i; - Uuid topicId = Uuid.randomUuid(); - allTopicNames.add(topicName); - - TopicMetadata metadata = new TopicMetadata( - topicId, - topicName, - partitionsPerTopicCount, - Collections.emptyMap() - ); - subscriptionMetadata.put(topicName, metadata); - - AssignorBenchmarkUtils.addTopic( - delta, - topicId, - topicName, - partitionsPerTopicCount - ); - } - - topicsImage = delta.apply(MetadataProvenance.EMPTY).topics(); - return subscriptionMetadata; - } - - private Map generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment() { - Map topicMetadataMap = new HashMap<>(topicCount); - subscriptionMetadata.forEach((topicName, topicMetadata) -> - topicMetadataMap.put( - topicMetadata.id(), - topicMetadata - ) + int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount; + subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata( + allTopicNames, + partitionsPerTopic, + topicName -> Collections.emptyMap() ); - createAssignmentSpec(); + topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata); + + Map topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata); + subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata); + } + + private Map generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment( + Map members + ) { + this.groupSpec = AssignorBenchmarkUtils.createGroupSpec( + members, + HOMOGENEOUS, + topicsImage + ); GroupAssignment groupAssignment = partitionAssignor.assign( groupSpec, - new SubscribedTopicDescriberImpl(topicMetadataMap) + subscribedTopicDescriber ); invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment); @@ -189,26 +158,23 @@ public class TargetAssignmentBuilderBenchmark { return initialTargetAssignment; } - private void createAssignmentSpec() { - Map members = new HashMap<>(); - - for (int i = 0; i < memberCount - 1; i++) { - String memberId = "member" + i; - - members.put(memberId, new MemberSubscriptionAndAssignmentImpl( - Optional.empty(), - Optional.empty(), - new TopicIds(new HashSet<>(allTopicNames), topicsImage), - Assignment.EMPTY - )); - } - groupSpec = new GroupSpecImpl( - members, - HOMOGENEOUS, - Collections.emptyMap() + private Map createMembers() { + return AssignorBenchmarkUtils.createHomogeneousMembers( + memberCount - 1, + this::memberId, + this::rackId, + allTopicNames ); } + private String memberId(int memberIndex) { + return "member" + memberIndex; + } + + private Optional rackId(int memberIndex) { + return Optional.empty(); + } + @Benchmark @Threads(1) @OutputTimeUnit(TimeUnit.MILLISECONDS)