KAFKA-16568: JMH Benchmarks for Server Side Rebalances (#15717)

This patch add three benchmarks for the client assignors, the server assignors and the target assignment builder.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Ritika Reddy 2024-04-25 07:46:45 -07:00 committed by GitHub
parent 0a6d5ff23c
commit 8013657f5d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 763 additions and 0 deletions

View File

@ -0,0 +1,287 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class ClientSideAssignorBenchmark {
public enum AssignorType {
RANGE(new RangeAssignor()),
COOPERATIVE_STICKY(new CooperativeStickyAssignor());
private final ConsumerPartitionAssignor assignor;
AssignorType(ConsumerPartitionAssignor assignor) {
this.assignor = assignor;
}
public ConsumerPartitionAssignor assignor() {
return assignor;
}
}
/**
* The subscription pattern followed by the members of the group.
*
* A subscription model is considered homogenous if all the members of the group
* are subscribed to the same set of topics, it is heterogeneous otherwise.
*/
public enum SubscriptionModel {
HOMOGENEOUS, HETEROGENEOUS
}
/**
* The assignment type is decided based on whether all the members are assigned partitions
* for the first time (full), or incrementally when a rebalance is triggered.
*/
public enum AssignmentType {
FULL, INCREMENTAL
}
@Param({"100", "500", "1000", "5000", "10000"})
private int memberCount;
@Param({"5", "10", "50"})
private int partitionsToMemberRatio;
@Param({"10", "100", "1000"})
private int topicCount;
@Param({"true", "false"})
private boolean isRackAware;
@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
private SubscriptionModel subscriptionModel;
@Param({"RANGE", "COOPERATIVE_STICKY"})
private AssignorType assignorType;
@Param({"FULL", "INCREMENTAL"})
private AssignmentType assignmentType;
private Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<>();
private ConsumerPartitionAssignor.GroupSubscription groupSubscription;
private static final int NUMBER_OF_RACKS = 3;
private static final int MAX_BUCKET_COUNT = 5;
private ConsumerPartitionAssignor assignor;
private Cluster metadata;
private final List<String> allTopicNames = new ArrayList<>();
@Setup(Level.Trial)
public void setup() {
// Ensure there are enough racks and brokers for the replication factor = 3.
if (NUMBER_OF_RACKS < 3) {
throw new IllegalArgumentException("Number of broker racks must be at least equal to 3.");
}
populateTopicMetadata();
addMemberSubscriptions();
assignor = assignorType.assignor();
if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance();
}
}
private void populateTopicMetadata() {
List<PartitionInfo> partitions = new ArrayList<>();
int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount;
// Create nodes (brokers), one for each rack.
List<Node> nodes = new ArrayList<>(NUMBER_OF_RACKS);
for (int i = 0; i < NUMBER_OF_RACKS; i++) {
nodes.add(new Node(i, "", i, "rack" + i));
}
for (int i = 0; i < topicCount; i++) {
String topicName = "topic" + i;
allTopicNames.add(topicName);
partitions.addAll(partitionInfos(topicName, partitionsPerTopicCount, nodes));
}
metadata = new Cluster("test-cluster", nodes, partitions, Collections.emptySet(), Collections.emptySet());
}
private void addMemberSubscriptions() {
// In the rebalance case, we will add the last member as a trigger.
// This is done to keep the final members count consistent with the input.
int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount;
// When subscriptions are homogeneous, all members are assigned all topics.
if (subscriptionModel == SubscriptionModel.HOMOGENEOUS) {
for (int i = 0; i < numberOfMembers; i++) {
String memberName = "member" + i;
subscriptions.put(memberName, subscription(allTopicNames, i));
}
} else {
// Adjust bucket count based on member count when member count < max bucket count.
int bucketCount = Math.min(MAX_BUCKET_COUNT, numberOfMembers);
// Check minimum topics requirement
if (topicCount < bucketCount) {
throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing.");
}
int bucketSizeTopics = (int) Math.ceil((double) topicCount / bucketCount);
int bucketSizeMembers = (int) Math.ceil((double) numberOfMembers / bucketCount);
// Define buckets for each member and assign topics from the same bucket
for (int bucket = 0; bucket < bucketCount; bucket++) {
int memberStartIndex = bucket * bucketSizeMembers;
int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, numberOfMembers);
int topicStartIndex = bucket * bucketSizeTopics;
int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicCount);
List<String> bucketTopics = allTopicNames.subList(topicStartIndex, topicEndIndex);
// Assign topics to each member in the current bucket
for (int i = memberStartIndex; i < memberEndIndex; i++) {
String memberName = "member" + i;
subscriptions.put(memberName, subscription(bucketTopics, i));
}
}
}
groupSubscription = new ConsumerPartitionAssignor.GroupSubscription(subscriptions);
}
private List<PartitionInfo> partitionInfos(String topic, int numberOfPartitions, List<Node> nodes) {
// Create PartitionInfo for each partition.
// Replication factor is hardcoded to 2.
List<PartitionInfo> partitionInfos = new ArrayList<>(numberOfPartitions);
for (int i = 0; i < numberOfPartitions; i++) {
Node[] replicas = new Node[3];
for (int j = 0; j < 3; j++) {
// Assign nodes based on partition number to mimic mkMapOfPartitionRacks logic.
int nodeIndex = (i + j) % NUMBER_OF_RACKS;
replicas[j] = nodes.get(nodeIndex);
}
partitionInfos.add(new PartitionInfo(topic, i, replicas[0], replicas, replicas));
}
return partitionInfos;
}
private ConsumerPartitionAssignor.Subscription subscription(List<String> topics, int consumerIndex) {
Optional<String> rackId = rackId(consumerIndex);
return new ConsumerPartitionAssignor.Subscription(
topics,
null,
Collections.emptyList(),
DEFAULT_GENERATION,
rackId
);
}
private Optional<String> rackId(int consumerIndex) {
return isRackAware ? Optional.of("rack" + consumerIndex % NUMBER_OF_RACKS) : Optional.empty();
}
private ConsumerPartitionAssignor.Subscription subscriptionWithOwnedPartitions(
List<TopicPartition> ownedPartitions,
ConsumerPartitionAssignor.Subscription prevSubscription
) {
return new ConsumerPartitionAssignor.Subscription(
prevSubscription.topics(),
null,
ownedPartitions,
DEFAULT_GENERATION,
prevSubscription.rackId()
);
}
private void simulateIncrementalRebalance() {
ConsumerPartitionAssignor.GroupAssignment initialAssignment = assignor.assign(metadata, groupSubscription);
Map<String, ConsumerPartitionAssignor.Subscription> newSubscriptions = new HashMap<>();
subscriptions.forEach((member, subscription) ->
newSubscriptions.put(
member,
subscriptionWithOwnedPartitions(
initialAssignment.groupAssignment().get(member).partitions(),
subscription
)
)
);
List<String> subscribedTopicsForNewMember;
if (subscriptionModel == SubscriptionModel.HETEROGENEOUS) {
subscribedTopicsForNewMember = subscriptions.get("member" + (memberCount - 2)).topics();
} else {
subscribedTopicsForNewMember = allTopicNames;
}
// Add new member to trigger a reassignment.
newSubscriptions.put("newMember", subscription(
subscribedTopicsForNewMember,
memberCount - 1
));
subscriptions = newSubscriptions;
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void doAssignment() {
assignor.assign(metadata, groupSubscription);
}
}

View File

@ -0,0 +1,281 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class ServerSideAssignorBenchmark {
public enum AssignorType {
RANGE(new RangeAssignor()),
UNIFORM(new UniformAssignor());
private final PartitionAssignor assignor;
AssignorType(PartitionAssignor assignor) {
this.assignor = assignor;
}
public PartitionAssignor assignor() {
return assignor;
}
}
/**
* The subscription pattern followed by the members of the group.
*
* A subscription model is considered homogenous if all the members of the group
* are subscribed to the same set of topics, it is heterogeneous otherwise.
*/
public enum SubscriptionModel {
HOMOGENEOUS, HETEROGENEOUS
}
/**
* The assignment type is decided based on whether all the members are assigned partitions
* for the first time (full), or incrementally when a rebalance is triggered.
*/
public enum AssignmentType {
FULL, INCREMENTAL
}
@Param({"100", "500", "1000", "5000", "10000"})
private int memberCount;
@Param({"5", "10", "50"})
private int partitionsToMemberRatio;
@Param({"10", "100", "1000"})
private int topicCount;
@Param({"true", "false"})
private boolean isRackAware;
@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
private SubscriptionModel subscriptionModel;
@Param({"RANGE", "UNIFORM"})
private AssignorType assignorType;
@Param({"FULL", "INCREMENTAL"})
private AssignmentType assignmentType;
private PartitionAssignor partitionAssignor;
private static final int NUMBER_OF_RACKS = 3;
private static final int MAX_BUCKET_COUNT = 5;
private AssignmentSpec assignmentSpec;
private SubscribedTopicDescriber subscribedTopicDescriber;
private final List<Uuid> allTopicIds = new ArrayList<>();
@Setup(Level.Trial)
public void setup() {
Map<Uuid, TopicMetadata> topicMetadata = createTopicMetadata();
subscribedTopicDescriber = new SubscribedTopicMetadata(topicMetadata);
createAssignmentSpec();
partitionAssignor = assignorType.assignor();
if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance();
}
}
private Map<Uuid, TopicMetadata> createTopicMetadata() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount;
Map<Integer, Set<String>> partitionRacks = isRackAware ?
mkMapOfPartitionRacks(partitionsPerTopicCount) :
Collections.emptyMap();
for (int i = 0; i < topicCount; i++) {
Uuid topicUuid = Uuid.randomUuid();
String topicName = "topic" + i;
allTopicIds.add(topicUuid);
topicMetadata.put(topicUuid, new TopicMetadata(
topicUuid,
topicName,
partitionsPerTopicCount,
partitionRacks
));
}
return topicMetadata;
}
private void createAssignmentSpec() {
Map<String, AssignmentMemberSpec> members = new HashMap<>();
// In the rebalance case, we will add the last member as a trigger.
// This is done to keep the total members count consistent with the input.
int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount;
if (subscriptionModel.equals(SubscriptionModel.HOMOGENEOUS)) {
for (int i = 0; i < numberOfMembers; i++) {
addMemberSpec(members, i, new HashSet<>(allTopicIds));
}
} else {
// Adjust bucket count based on member count when member count < max bucket count.
int bucketCount = Math.min(MAX_BUCKET_COUNT, numberOfMembers);
// Check minimum topics requirement
if (topicCount < bucketCount) {
throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing.");
}
int bucketSizeTopics = (int) Math.ceil((double) topicCount / bucketCount);
int bucketSizeMembers = (int) Math.ceil((double) numberOfMembers / bucketCount);
// Define buckets for each member and assign topics from the same bucket
for (int bucket = 0; bucket < bucketCount; bucket++) {
int memberStartIndex = bucket * bucketSizeMembers;
int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, numberOfMembers);
int topicStartIndex = bucket * bucketSizeTopics;
int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicCount);
Set<Uuid> bucketTopics = new HashSet<>(allTopicIds.subList(topicStartIndex, topicEndIndex));
// Assign topics to each member in the current bucket
for (int i = memberStartIndex; i < memberEndIndex; i++) {
addMemberSpec(members, i, bucketTopics);
}
}
}
this.assignmentSpec = new AssignmentSpec(members);
}
private Optional<String> rackId(int memberIndex) {
return isRackAware ? Optional.of("rack" + memberIndex % NUMBER_OF_RACKS) : Optional.empty();
}
private void addMemberSpec(
Map<String, AssignmentMemberSpec> members,
int memberIndex,
Set<Uuid> subscribedTopicIds
) {
String memberId = "member" + memberIndex;
Optional<String> rackId = rackId(memberIndex);
members.put(memberId, new AssignmentMemberSpec(
Optional.empty(),
rackId,
subscribedTopicIds,
Collections.emptyMap()
));
}
private static Map<Integer, Set<String>> mkMapOfPartitionRacks(int numPartitions) {
Map<Integer, Set<String>> partitionRacks = new HashMap<>(numPartitions);
for (int i = 0; i < numPartitions; i++) {
partitionRacks.put(i, new HashSet<>(Arrays.asList(
"rack" + i % NUMBER_OF_RACKS,
"rack" + (i + 1) % NUMBER_OF_RACKS,
"rack" + (i + 2) % NUMBER_OF_RACKS
)));
}
return partitionRacks;
}
private void simulateIncrementalRebalance() {
GroupAssignment initialAssignment = partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
Map<String, MemberAssignment> members = initialAssignment.members();
Map<String, AssignmentMemberSpec> updatedMembers = new HashMap<>();
members.forEach((memberId, memberAssignment) -> {
AssignmentMemberSpec memberSpec = assignmentSpec.members().get(memberId);
updatedMembers.put(memberId, new AssignmentMemberSpec(
memberSpec.instanceId(),
memberSpec.rackId(),
memberSpec.subscribedTopicIds(),
memberAssignment.targetPartitions()
));
});
Collection<Uuid> subscribedTopicIdsForNewMember;
if (subscriptionModel == SubscriptionModel.HETEROGENEOUS) {
subscribedTopicIdsForNewMember = updatedMembers.get("member" + (memberCount - 2)).subscribedTopicIds();
} else {
subscribedTopicIdsForNewMember = allTopicIds;
}
Optional<String> rackId = rackId(memberCount - 1);
updatedMembers.put("newMember", new AssignmentMemberSpec(
Optional.empty(),
rackId,
subscribedTopicIdsForNewMember,
Collections.emptyMap()
));
assignmentSpec = new AssignmentSpec(updatedMembers);
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void doAssignment() {
partitionAssignor.assign(assignmentSpec, subscribedTopicDescriber);
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.consumer.Assignment;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class TargetAssignmentBuilderBenchmark {
@Param({"100", "500", "1000", "5000", "10000"})
private int memberCount;
@Param({"5", "10", "50"})
private int partitionsToMemberRatio;
@Param({"10", "100", "1000"})
private int topicCount;
private static final String GROUP_ID = "benchmark-group";
private static final int GROUP_EPOCH = 0;
private PartitionAssignor partitionAssignor;
private Map<String, TopicMetadata> subscriptionMetadata = Collections.emptyMap();
private TargetAssignmentBuilder targetAssignmentBuilder;
private AssignmentSpec assignmentSpec;
private final List<String> allTopicNames = new ArrayList<>();
private final List<Uuid> allTopicIds = new ArrayList<>();
@Setup(Level.Trial)
public void setup() {
// For this benchmark we will use the Uniform Assignor
// and a group that has a homogeneous subscription model.
partitionAssignor = new UniformAssignor();
subscriptionMetadata = generateMockSubscriptionMetadata();
Map<String, ConsumerGroupMember> members = generateMockMembers();
Map<String, Assignment> existingTargetAssignment = generateMockInitialTargetAssignment();
ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember")
.setSubscribedTopicNames(allTopicNames)
.build();
targetAssignmentBuilder = new TargetAssignmentBuilder(GROUP_ID, GROUP_EPOCH, partitionAssignor)
.withMembers(members)
.withSubscriptionMetadata(subscriptionMetadata)
.withTargetAssignment(existingTargetAssignment)
.addOrUpdateMember(newMember.memberId(), newMember);
}
private Map<String, ConsumerGroupMember> generateMockMembers() {
Map<String, ConsumerGroupMember> members = new HashMap<>();
for (int i = 0; i < memberCount - 1; i++) {
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member" + i)
.setSubscribedTopicNames(allTopicNames)
.build();
members.put("member" + i, member);
}
return members;
}
private Map<String, TopicMetadata> generateMockSubscriptionMetadata() {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount;
for (int i = 0; i < topicCount; i++) {
String topicName = "topic-" + i;
Uuid topicId = Uuid.randomUuid();
allTopicNames.add(topicName);
allTopicIds.add(topicId);
TopicMetadata metadata = new TopicMetadata(
topicId,
topicName,
partitionsPerTopicCount,
Collections.emptyMap()
);
subscriptionMetadata.put(topicName, metadata);
}
return subscriptionMetadata;
}
private Map<String, Assignment> generateMockInitialTargetAssignment() {
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>(topicCount);
subscriptionMetadata.forEach((topicName, topicMetadata) ->
topicMetadataMap.put(
topicMetadata.id(),
topicMetadata
)
);
createAssignmentSpec();
GroupAssignment groupAssignment = partitionAssignor.assign(
assignmentSpec,
new SubscribedTopicMetadata(topicMetadataMap)
);
Map<String, Assignment> initialTargetAssignment = new HashMap<>(memberCount);
for (Map.Entry<String, MemberAssignment> entry : groupAssignment.members().entrySet()) {
String memberId = entry.getKey();
Map<Uuid, Set<Integer>> topicPartitions = entry.getValue().targetPartitions();
Assignment assignment = new Assignment(topicPartitions);
initialTargetAssignment.put(memberId, assignment);
}
return initialTargetAssignment;
}
private void createAssignmentSpec() {
Map<String, AssignmentMemberSpec> members = new HashMap<>();
for (int i = 0; i < memberCount - 1; i++) {
String memberId = "member" + i;
members.put(memberId, new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
allTopicIds,
Collections.emptyMap()
));
}
assignmentSpec = new AssignmentSpec(members);
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void build() {
targetAssignmentBuilder.build();
}
}