diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java new file mode 100644 index 00000000000..1747632d09c --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ClientSideAssignorBenchmark.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.CooperativeStickyAssignor; +import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ClientSideAssignorBenchmark { + + public enum AssignorType { + RANGE(new RangeAssignor()), + COOPERATIVE_STICKY(new CooperativeStickyAssignor()); + + private final ConsumerPartitionAssignor assignor; + + AssignorType(ConsumerPartitionAssignor assignor) { + this.assignor = assignor; + } + + public ConsumerPartitionAssignor assignor() { + return assignor; + } + } + + /** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ + public enum SubscriptionModel { + HOMOGENEOUS, HETEROGENEOUS + } + + /** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ + public enum AssignmentType { + FULL, INCREMENTAL + } + + @Param({"100", "500", "1000", "5000", "10000"}) + private int memberCount; + + @Param({"5", "10", "50"}) + private int partitionsToMemberRatio; + + @Param({"10", "100", "1000"}) + private int topicCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"HOMOGENEOUS", "HETEROGENEOUS"}) + private SubscriptionModel subscriptionModel; + + @Param({"RANGE", "COOPERATIVE_STICKY"}) + private AssignorType assignorType; + + @Param({"FULL", "INCREMENTAL"}) + private AssignmentType assignmentType; + + private Map subscriptions = new HashMap<>(); + + private ConsumerPartitionAssignor.GroupSubscription groupSubscription; + + private static final int NUMBER_OF_RACKS = 3; + + private static final int MAX_BUCKET_COUNT = 5; + + private ConsumerPartitionAssignor assignor; + + private Cluster metadata; + + private final List allTopicNames = new ArrayList<>(); + + @Setup(Level.Trial) + public void setup() { + // Ensure there are enough racks and brokers for the replication factor = 3. + if (NUMBER_OF_RACKS < 3) { + throw new IllegalArgumentException("Number of broker racks must be at least equal to 3."); + } + + populateTopicMetadata(); + + addMemberSubscriptions(); + + assignor = assignorType.assignor(); + + if (assignmentType == AssignmentType.INCREMENTAL) { + simulateIncrementalRebalance(); + } + } + + private void populateTopicMetadata() { + List partitions = new ArrayList<>(); + int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount; + + // Create nodes (brokers), one for each rack. + List nodes = new ArrayList<>(NUMBER_OF_RACKS); + for (int i = 0; i < NUMBER_OF_RACKS; i++) { + nodes.add(new Node(i, "", i, "rack" + i)); + } + + for (int i = 0; i < topicCount; i++) { + String topicName = "topic" + i; + allTopicNames.add(topicName); + partitions.addAll(partitionInfos(topicName, partitionsPerTopicCount, nodes)); + } + + metadata = new Cluster("test-cluster", nodes, partitions, Collections.emptySet(), Collections.emptySet()); + } + + private void addMemberSubscriptions() { + // In the rebalance case, we will add the last member as a trigger. + // This is done to keep the final members count consistent with the input. + int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount; + + // When subscriptions are homogeneous, all members are assigned all topics. + if (subscriptionModel == SubscriptionModel.HOMOGENEOUS) { + for (int i = 0; i < numberOfMembers; i++) { + String memberName = "member" + i; + subscriptions.put(memberName, subscription(allTopicNames, i)); + } + } else { + // Adjust bucket count based on member count when member count < max bucket count. + int bucketCount = Math.min(MAX_BUCKET_COUNT, numberOfMembers); + + // Check minimum topics requirement + if (topicCount < bucketCount) { + throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing."); + } + + int bucketSizeTopics = (int) Math.ceil((double) topicCount / bucketCount); + int bucketSizeMembers = (int) Math.ceil((double) numberOfMembers / bucketCount); + + // Define buckets for each member and assign topics from the same bucket + for (int bucket = 0; bucket < bucketCount; bucket++) { + int memberStartIndex = bucket * bucketSizeMembers; + int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, numberOfMembers); + + int topicStartIndex = bucket * bucketSizeTopics; + int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicCount); + + List bucketTopics = allTopicNames.subList(topicStartIndex, topicEndIndex); + + // Assign topics to each member in the current bucket + for (int i = memberStartIndex; i < memberEndIndex; i++) { + String memberName = "member" + i; + subscriptions.put(memberName, subscription(bucketTopics, i)); + } + } + } + + groupSubscription = new ConsumerPartitionAssignor.GroupSubscription(subscriptions); + } + + private List partitionInfos(String topic, int numberOfPartitions, List nodes) { + // Create PartitionInfo for each partition. + // Replication factor is hardcoded to 2. + List partitionInfos = new ArrayList<>(numberOfPartitions); + for (int i = 0; i < numberOfPartitions; i++) { + Node[] replicas = new Node[3]; + for (int j = 0; j < 3; j++) { + // Assign nodes based on partition number to mimic mkMapOfPartitionRacks logic. + int nodeIndex = (i + j) % NUMBER_OF_RACKS; + replicas[j] = nodes.get(nodeIndex); + } + partitionInfos.add(new PartitionInfo(topic, i, replicas[0], replicas, replicas)); + } + + return partitionInfos; + } + + private ConsumerPartitionAssignor.Subscription subscription(List topics, int consumerIndex) { + Optional rackId = rackId(consumerIndex); + return new ConsumerPartitionAssignor.Subscription( + topics, + null, + Collections.emptyList(), + DEFAULT_GENERATION, + rackId + ); + } + + private Optional rackId(int consumerIndex) { + return isRackAware ? Optional.of("rack" + consumerIndex % NUMBER_OF_RACKS) : Optional.empty(); + } + + private ConsumerPartitionAssignor.Subscription subscriptionWithOwnedPartitions( + List ownedPartitions, + ConsumerPartitionAssignor.Subscription prevSubscription + ) { + return new ConsumerPartitionAssignor.Subscription( + prevSubscription.topics(), + null, + ownedPartitions, + DEFAULT_GENERATION, + prevSubscription.rackId() + ); + } + + private void simulateIncrementalRebalance() { + ConsumerPartitionAssignor.GroupAssignment initialAssignment = assignor.assign(metadata, groupSubscription); + Map newSubscriptions = new HashMap<>(); + subscriptions.forEach((member, subscription) -> + newSubscriptions.put( + member, + subscriptionWithOwnedPartitions( + initialAssignment.groupAssignment().get(member).partitions(), + subscription + ) + ) + ); + + List subscribedTopicsForNewMember; + if (subscriptionModel == SubscriptionModel.HETEROGENEOUS) { + subscribedTopicsForNewMember = subscriptions.get("member" + (memberCount - 2)).topics(); + } else { + subscribedTopicsForNewMember = allTopicNames; + } + + // Add new member to trigger a reassignment. + newSubscriptions.put("newMember", subscription( + subscribedTopicsForNewMember, + memberCount - 1 + )); + + subscriptions = newSubscriptions; + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void doAssignment() { + assignor.assign(metadata, groupSubscription); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java new file mode 100644 index 00000000000..ac3bb25f765 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + + public enum AssignorType { + RANGE(new RangeAssignor()), + UNIFORM(new UniformAssignor()); + + private final PartitionAssignor assignor; + + AssignorType(PartitionAssignor assignor) { + this.assignor = assignor; + } + + public PartitionAssignor assignor() { + return assignor; + } + } + + /** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ + public enum SubscriptionModel { + HOMOGENEOUS, HETEROGENEOUS + } + + /** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ + public enum AssignmentType { + FULL, INCREMENTAL + } + + @Param({"100", "500", "1000", "5000", "10000"}) + private int memberCount; + + @Param({"5", "10", "50"}) + private int partitionsToMemberRatio; + + @Param({"10", "100", "1000"}) + private int topicCount; + + @Param({"true", "false"}) + private boolean isRackAware; + + @Param({"HOMOGENEOUS", "HETEROGENEOUS"}) + private SubscriptionModel subscriptionModel; + + @Param({"RANGE", "UNIFORM"}) + private AssignorType assignorType; + + @Param({"FULL", "INCREMENTAL"}) + private AssignmentType assignmentType; + + private PartitionAssignor partitionAssignor; + + private static final int NUMBER_OF_RACKS = 3; + + private static final int MAX_BUCKET_COUNT = 5; + + private AssignmentSpec assignmentSpec; + + private SubscribedTopicDescriber subscribedTopicDescriber; + + private final List allTopicIds = new ArrayList<>(); + + @Setup(Level.Trial) + public void setup() { + Map topicMetadata = createTopicMetadata(); + subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata); + + createAssignmentSpec(); + + partitionAssignor = assignorType.assignor(); + + if (assignmentType == AssignmentType.INCREMENTAL) { + simulateIncrementalRebalance(); + } + } + + private Map createTopicMetadata() { + Map topicMetadata = new HashMap<>(); + int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount; + + Map> partitionRacks = isRackAware ? + mkMapOfPartitionRacks(partitionsPerTopicCount) : + Collections.emptyMap(); + + for (int i = 0; i < topicCount; i++) { + Uuid topicUuid = Uuid.randomUuid(); + String topicName = "topic" + i; + allTopicIds.add(topicUuid); + topicMetadata.put(topicUuid, new TopicMetadata( + topicUuid, + topicName, + partitionsPerTopicCount, + partitionRacks + )); + } + + return topicMetadata; + } + + private void createAssignmentSpec() { + Map members = new HashMap<>(); + + // In the rebalance case, we will add the last member as a trigger. + // This is done to keep the total members count consistent with the input. + int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount; + + if (subscriptionModel.equals(SubscriptionModel.HOMOGENEOUS)) { + for (int i = 0; i < numberOfMembers; i++) { + addMemberSpec(members, i, new HashSet<>(allTopicIds)); + } + } else { + // Adjust bucket count based on member count when member count < max bucket count. + int bucketCount = Math.min(MAX_BUCKET_COUNT, numberOfMembers); + + // Check minimum topics requirement + if (topicCount < bucketCount) { + throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing."); + } + + int bucketSizeTopics = (int) Math.ceil((double) topicCount / bucketCount); + int bucketSizeMembers = (int) Math.ceil((double) numberOfMembers / bucketCount); + + // Define buckets for each member and assign topics from the same bucket + for (int bucket = 0; bucket < bucketCount; bucket++) { + int memberStartIndex = bucket * bucketSizeMembers; + int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, numberOfMembers); + + int topicStartIndex = bucket * bucketSizeTopics; + int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicCount); + + Set bucketTopics = new HashSet<>(allTopicIds.subList(topicStartIndex, topicEndIndex)); + + // Assign topics to each member in the current bucket + for (int i = memberStartIndex; i < memberEndIndex; i++) { + addMemberSpec(members, i, bucketTopics); + } + } + } + + this.assignmentSpec = new AssignmentSpec(members); + } + + private Optional rackId(int memberIndex) { + return isRackAware ? Optional.of("rack" + memberIndex % NUMBER_OF_RACKS) : Optional.empty(); + } + + private void addMemberSpec( + Map members, + int memberIndex, + Set subscribedTopicIds + ) { + String memberId = "member" + memberIndex; + Optional rackId = rackId(memberIndex); + + members.put(memberId, new AssignmentMemberSpec( + Optional.empty(), + rackId, + subscribedTopicIds, + Collections.emptyMap() + )); + } + + private static Map> mkMapOfPartitionRacks(int numPartitions) { + Map> partitionRacks = new HashMap<>(numPartitions); + for (int i = 0; i < numPartitions; i++) { + partitionRacks.put(i, new HashSet<>(Arrays.asList( + "rack" + i % NUMBER_OF_RACKS, + "rack" + (i + 1) % NUMBER_OF_RACKS, + "rack" + (i + 2) % NUMBER_OF_RACKS + ))); + } + return partitionRacks; + } + + private void simulateIncrementalRebalance() { + GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); + Map members = initialAssignment.members(); + + Map updatedMembers = new HashMap<>(); + members.forEach((memberId, memberAssignment) -> { + AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId); + updatedMembers.put(memberId, new AssignmentMemberSpec( + memberSpec.instanceId(), + memberSpec.rackId(), + memberSpec.subscribedTopicIds(), + memberAssignment.targetPartitions() + )); + }); + + Collection subscribedTopicIdsForNewMember; + if (subscriptionModel == SubscriptionModel.HETEROGENEOUS) { + subscribedTopicIdsForNewMember = updatedMembers.get("member" + (memberCount - 2)).subscribedTopicIds(); + } else { + subscribedTopicIdsForNewMember = allTopicIds; + } + + Optional rackId = rackId(memberCount - 1); + updatedMembers.put("newMember", new AssignmentMemberSpec( + Optional.empty(), + rackId, + subscribedTopicIdsForNewMember, + Collections.emptyMap() + )); + + assignmentSpec = new AssignmentSpec(updatedMembers); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void doAssignment() { + partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java new file mode 100644 index 00000000000..e39492f107d --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.Assignment; +import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class TargetAssignmentBuilderBenchmark { + + @Param({"100", "500", "1000", "5000", "10000"}) + private int memberCount; + + @Param({"5", "10", "50"}) + private int partitionsToMemberRatio; + + @Param({"10", "100", "1000"}) + private int topicCount; + + private static final String GROUP_ID = "benchmark-group"; + + private static final int GROUP_EPOCH = 0; + + private PartitionAssignor partitionAssignor; + + private Map subscriptionMetadata = Collections.emptyMap(); + + private TargetAssignmentBuilder targetAssignmentBuilder; + + private AssignmentSpec assignmentSpec; + + private final List allTopicNames = new ArrayList<>(); + + private final List allTopicIds = new ArrayList<>(); + + @Setup(Level.Trial) + public void setup() { + // For this benchmark we will use the Uniform Assignor + // and a group that has a homogeneous subscription model. + partitionAssignor = new UniformAssignor(); + + subscriptionMetadata = generateMockSubscriptionMetadata(); + Map members = generateMockMembers(); + Map existingTargetAssignment = generateMockInitialTargetAssignment(); + + ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember") + .setSubscribedTopicNames(allTopicNames) + .build(); + + targetAssignmentBuilder = new TargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, partitionAssignor) + .withMembers(members) + .withSubscriptionMetadata(subscriptionMetadata) + .withTargetAssignment(existingTargetAssignment) + .addOrUpdateMember(newMember.memberId(), newMember); + } + + private Map generateMockMembers() { + Map members = new HashMap<>(); + + for (int i = 0; i < memberCount - 1; i++) { + ConsumerGroupMember member = new ConsumerGroupMember.Builder("member" + i) + .setSubscribedTopicNames(allTopicNames) + .build(); + members.put("member" + i, member); + } + return members; + } + + private Map generateMockSubscriptionMetadata() { + Map subscriptionMetadata = new HashMap<>(); + int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount; + + for (int i = 0; i < topicCount; i++) { + String topicName = "topic-" + i; + Uuid topicId = Uuid.randomUuid(); + allTopicNames.add(topicName); + allTopicIds.add(topicId); + + TopicMetadata metadata = new TopicMetadata( + topicId, + topicName, + partitionsPerTopicCount, + Collections.emptyMap() + ); + subscriptionMetadata.put(topicName, metadata); + } + + return subscriptionMetadata; + } + + private Map generateMockInitialTargetAssignment() { + Map topicMetadataMap = new HashMap<>(topicCount); + subscriptionMetadata.forEach((topicName, topicMetadata) -> + topicMetadataMap.put( + topicMetadata.id(), + topicMetadata + ) + ); + + createAssignmentSpec(); + + GroupAssignment groupAssignment = partitionAssignor.assign( + assignmentSpec, + new SubscribedTopicMetadata(topicMetadataMap) + ); + + Map initialTargetAssignment = new HashMap<>(memberCount); + + for (Map.Entry entry : groupAssignment.members().entrySet()) { + String memberId = entry.getKey(); + Map> topicPartitions = entry.getValue().targetPartitions(); + + Assignment assignment = new Assignment(topicPartitions); + + initialTargetAssignment.put(memberId, assignment); + } + + return initialTargetAssignment; + } + + private void createAssignmentSpec() { + Map members = new HashMap<>(); + + for (int i = 0; i < memberCount - 1; i++) { + String memberId = "member" + i; + + members.put(memberId, new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + allTopicIds, + Collections.emptyMap() + )); + } + assignmentSpec = new AssignmentSpec(members); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void build() { + targetAssignmentBuilder.build(); + } +}