From 9352faa8fc159de139ee6a0b29a9c0deb6b4bafe Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 23 Sep 2024 15:55:54 +0100 Subject: [PATCH] KAFKA-17495: Factor out assignor benchmark code into utils class (#17133) ServerSideAssignorBenchmark and TargetAssignmentBuilderBenchmark have the same topic and member subscription setup for the most part. Factor out the commonality so that it's easier to share new setups between both benchmarks. Reviewers: David Jacot --- .../jmh/assignor/AssignorBenchmarkUtils.java | 226 ++++++++++++++++++ .../assignor/ServerSideAssignorBenchmark.java | 133 ++++------- .../TargetAssignmentBuilderBenchmark.java | 130 ++++------ 3 files changed, 316 insertions(+), 173 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java index 4bfcd727852..b9ce9bab9af 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/AssignorBenchmarkUtils.java @@ -20,13 +20,30 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.modern.Assignment; +import org.apache.kafka.coordinator.group.modern.GroupSpecImpl; +import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl; +import org.apache.kafka.coordinator.group.modern.TopicIds; +import org.apache.kafka.coordinator.group.modern.TopicMetadata; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.TopicsImage; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; +import java.util.function.Function; + public class AssignorBenchmarkUtils { /** @@ -57,6 +74,215 @@ public class AssignorBenchmarkUtils { return invertedTargetAssignment; } + /** + * Generates a list of topic names for use in benchmarks. + * + * @param topicCount The number of topic names to generate. + * @return The list of topic names. + */ + public static List createTopicNames(int topicCount) { + List topicNames = new ArrayList<>(); + for (int i = 0; i < topicCount; i++) { + topicNames.add("topic-" + i); + } + return topicNames; + } + + /** + * Creates a subscription metadata map for the given topics. + * + * @param topicNames The names of the topics. + * @param partitionsPerTopic The number of partitions per topic. + * @param getTopicPartitionRacks A function to get the racks map for each topic. May return + * an empty map if no rack info is desired. + * @return The subscription metadata map. + */ + public static Map createSubscriptionMetadata( + List topicNames, + int partitionsPerTopic, + Function>> getTopicPartitionRacks + ) { + Map subscriptionMetadata = new HashMap<>(); + + for (String topicName : topicNames) { + Uuid topicId = Uuid.randomUuid(); + + TopicMetadata metadata = new TopicMetadata( + topicId, + topicName, + partitionsPerTopic, + getTopicPartitionRacks.apply(topicName) + ); + subscriptionMetadata.put(topicName, metadata); + } + + return subscriptionMetadata; + } + + /** + * Creates a topic metadata map from the given subscription metadata. + * + * @param subscriptionMetadata The subscription metadata. + * @return The topic metadata map. + */ + public static Map createTopicMetadata( + Map subscriptionMetadata + ) { + Map topicMetadata = new HashMap<>((int) (subscriptionMetadata.size() / 0.75f + 1)); + for (Map.Entry entry : subscriptionMetadata.entrySet()) { + topicMetadata.put(entry.getValue().id(), entry.getValue()); + } + return topicMetadata; + } + + /** + * Creates a TopicsImage from the given subscription metadata. + * + * @param subscriptionMetadata The subscription metadata. + * @return A TopicsImage containing the topic ids, names and partition counts from the + * subscription metadata. + */ + public static TopicsImage createTopicsImage(Map subscriptionMetadata) { + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); + + for (Map.Entry entry : subscriptionMetadata.entrySet()) { + TopicMetadata topicMetadata = entry.getValue(); + AssignorBenchmarkUtils.addTopic( + delta, + topicMetadata.id(), + topicMetadata.name(), + topicMetadata.numPartitions() + ); + } + + return delta.apply(MetadataProvenance.EMPTY).topics(); + } + + /** + * Creates a GroupSpec from the given ConsumerGroupMembers. + * + * @param members The ConsumerGroupMembers. + * @param subscriptionType  The group's subscription type. + * @param topicsImage The TopicsImage to use. + * @return The new GroupSpec. + */ + public static GroupSpec createGroupSpec( + Map members, + SubscriptionType subscriptionType, + TopicsImage topicsImage + ) { + Map memberSpecs = new HashMap<>(); + + // Prepare the member spec for all members. + for (Map.Entry memberEntry : members.entrySet()) { + String memberId = memberEntry.getKey(); + ConsumerGroupMember member = memberEntry.getValue(); + + memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl( + Optional.ofNullable(member.rackId()), + Optional.ofNullable(member.instanceId()), + new TopicIds(member.subscribedTopicNames(), topicsImage), + new Assignment(member.assignedPartitions()) + )); + } + + return new GroupSpecImpl( + memberSpecs, + subscriptionType, + Collections.emptyMap() + ); + } + + /** + * Creates a ConsumerGroupMembers map where all members have the same topic subscriptions. + * + * @param memberCount The number of members in the group. + * @param getMemberId A function to map member indices to member ids. + * @param getMemberRackId A function to map member indices to rack ids. + * @param topicNames The topics to subscribe to. + * @return The new ConsumerGroupMembers map. + */ + public static Map createHomogeneousMembers( + int memberCount, + Function getMemberId, + Function> getMemberRackId, + List topicNames + ) { + Map members = new HashMap<>(); + + for (int i = 0; i < memberCount; i++) { + String memberId = getMemberId.apply(i); + Optional rackId = getMemberRackId.apply(i); + + members.put(memberId, new ConsumerGroupMember.Builder("member" + i) + .setRackId(rackId.orElse(null)) + .setSubscribedTopicNames(topicNames) + .build() + ); + } + + return members; + } + + /** + * Creates a ConsumerGroupMembers map where members have different topic subscriptions. + * + * Divides members and topics into a given number of buckets. Within each bucket, members are + * subscribed to the same topics. + * + * @param memberCount The number of members in the group. + * @param bucketCount The number of buckets. + * @param getMemberId A function to map member indices to member ids. + * @param getMemberRackId A function to map member indices to rack ids. + * @param topicNames The topics to subscribe to. + * @return The new ConsumerGroupMembers map. + */ + public static Map createHeterogeneousBucketedMembers( + int memberCount, + int bucketCount, + Function getMemberId, + Function> getMemberRackId, + List topicNames + ) { + Map members = new HashMap<>(); + + // Adjust bucket count based on member count when member count < max bucket count. + bucketCount = Math.min(bucketCount, memberCount); + + // Check minimum topics requirement + if (topicNames.size() < bucketCount) { + throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing."); + } + + int bucketSizeTopics = (int) Math.ceil((double) topicNames.size() / bucketCount); + int bucketSizeMembers = (int) Math.ceil((double) memberCount / bucketCount); + + // Define buckets for each member and assign topics from the same bucket + for (int bucket = 0; bucket < bucketCount; bucket++) { + int memberStartIndex = bucket * bucketSizeMembers; + int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, memberCount); + + int topicStartIndex = bucket * bucketSizeTopics; + int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicNames.size()); + + List bucketTopicNames = topicNames.subList(topicStartIndex, topicEndIndex); + + // Assign topics to each member in the current bucket + for (int i = memberStartIndex; i < memberEndIndex; i++) { + String memberId = getMemberId.apply(i); + Optional rackId = getMemberRackId.apply(i); + + members.put(memberId, new ConsumerGroupMember.Builder("member" + i) + .setRackId(rackId.orElse(null)) + .setSubscribedTopicNames(bucketTopicNames) + .build() + ); + } + } + + return members; + } + public static void addTopic( MetadataDelta delta, Uuid topicId, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java index cbd074c8896..82f11350381 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java @@ -18,6 +18,7 @@ package org.apache.kafka.jmh.assignor; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; @@ -31,9 +32,7 @@ import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignment import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.modern.TopicIds; import org.apache.kafka.coordinator.group.modern.TopicMetadata; -import org.apache.kafka.image.MetadataDelta; -import org.apache.kafka.image.MetadataImage; -import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.image.TopicsImage; import org.openjdk.jmh.annotations.Benchmark; @@ -50,7 +49,6 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -122,127 +120,80 @@ public class ServerSideAssignorBenchmark { private static final int MAX_BUCKET_COUNT = 5; - private GroupSpecImpl groupSpec; + private GroupSpec groupSpec; - private SubscribedTopicDescriber subscribedTopicDescriber; + private List allTopicNames = Collections.emptyList(); - private final List allTopicNames = new ArrayList<>(); + private Map subscriptionMetadata = Collections.emptyMap(); private TopicsImage topicsImage = TopicsImage.EMPTY; + private SubscribedTopicDescriber subscribedTopicDescriber; + @Setup(Level.Trial) public void setup() { - Map topicMetadata = createTopicMetadata(); - subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata); - - createGroupSpec(); - partitionAssignor = assignorType.assignor(); + setupTopics(); + + Map members = createMembers(); + this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicsImage); + if (assignmentType == AssignmentType.INCREMENTAL) { simulateIncrementalRebalance(); } } - private Map createTopicMetadata() { - MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); - Map topicMetadata = new HashMap<>(); - int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount; + private void setupTopics() { + allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount); + int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount; Map> partitionRacks = isRackAware ? - mkMapOfPartitionRacks(partitionsPerTopicCount) : + mkMapOfPartitionRacks(partitionsPerTopic) : Collections.emptyMap(); + subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata( + allTopicNames, + partitionsPerTopic, + topicName -> partitionRacks + ); - for (int i = 0; i < topicCount; i++) { - Uuid topicUuid = Uuid.randomUuid(); - String topicName = "topic" + i; - allTopicNames.add(topicName); - topicMetadata.put(topicUuid, new TopicMetadata( - topicUuid, - topicName, - partitionsPerTopicCount, - partitionRacks - )); + topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata); - AssignorBenchmarkUtils.addTopic( - delta, - topicUuid, - topicName, - partitionsPerTopicCount - ); - } - - topicsImage = delta.apply(MetadataProvenance.EMPTY).topics(); - return topicMetadata; + Map topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata); + subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata); } - private void createGroupSpec() { - Map members = new HashMap<>(); - + private Map createMembers() { // In the rebalance case, we will add the last member as a trigger. // This is done to keep the total members count consistent with the input. int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount; if (subscriptionType == HOMOGENEOUS) { - for (int i = 0; i < numberOfMembers; i++) { - addMemberSpec(members, i, new TopicIds(new HashSet<>(allTopicNames), topicsImage)); - } + return AssignorBenchmarkUtils.createHomogeneousMembers( + numberOfMembers, + this::memberId, + this::rackId, + allTopicNames + ); } else { - // Adjust bucket count based on member count when member count < max bucket count. - int bucketCount = Math.min(MAX_BUCKET_COUNT, numberOfMembers); - - // Check minimum topics requirement - if (topicCount < bucketCount) { - throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing."); - } - - int bucketSizeTopics = (int) Math.ceil((double) topicCount / bucketCount); - int bucketSizeMembers = (int) Math.ceil((double) numberOfMembers / bucketCount); - - // Define buckets for each member and assign topics from the same bucket - for (int bucket = 0; bucket < bucketCount; bucket++) { - int memberStartIndex = bucket * bucketSizeMembers; - int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, numberOfMembers); - - int topicStartIndex = bucket * bucketSizeTopics; - int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicCount); - - TopicIds bucketTopics = new TopicIds(new HashSet<>(allTopicNames.subList(topicStartIndex, topicEndIndex)), topicsImage); - - // Assign topics to each member in the current bucket - for (int i = memberStartIndex; i < memberEndIndex; i++) { - addMemberSpec(members, i, bucketTopics); - } - } + return AssignorBenchmarkUtils.createHeterogeneousBucketedMembers( + numberOfMembers, + MAX_BUCKET_COUNT, + this::memberId, + this::rackId, + allTopicNames + ); } + } - this.groupSpec = new GroupSpecImpl( - members, - subscriptionType, - Collections.emptyMap() - ); + private String memberId(int memberIndex) { + return "member" + memberIndex; } private Optional rackId(int memberIndex) { return isRackAware ? Optional.of("rack" + memberIndex % NUMBER_OF_RACKS) : Optional.empty(); } - private void addMemberSpec( - Map members, - int memberIndex, - Set subscribedTopicIds - ) { - String memberId = "member" + memberIndex; - Optional rackId = rackId(memberIndex); - - members.put(memberId, new MemberSubscriptionAndAssignmentImpl( - rackId, - Optional.empty(), - subscribedTopicIds, - Assignment.EMPTY - )); - } - private static Map> mkMapOfPartitionRacks(int numPartitions) { Map> partitionRacks = new HashMap<>(numPartitions); for (int i = 0; i < numPartitions; i++) { @@ -279,7 +230,7 @@ public class ServerSideAssignorBenchmark { Set subscribedTopicIdsForNewMember; if (subscriptionType == HETEROGENEOUS) { - subscribedTopicIdsForNewMember = updatedMemberSpec.get("member" + (memberCount - 2)).subscribedTopicIds(); + subscribedTopicIdsForNewMember = updatedMemberSpec.get(memberId(memberCount - 2)).subscribedTopicIds(); } else { subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicsImage); } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java index 4734036d693..adf40671ebc 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java @@ -18,20 +18,16 @@ package org.apache.kafka.jmh.assignor; import org.apache.kafka.common.Uuid; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import org.apache.kafka.coordinator.group.modern.Assignment; -import org.apache.kafka.coordinator.group.modern.GroupSpecImpl; -import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl; import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder; -import org.apache.kafka.coordinator.group.modern.TopicIds; import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; -import org.apache.kafka.image.MetadataDelta; -import org.apache.kafka.image.MetadataImage; -import org.apache.kafka.image.MetadataProvenance; import org.apache.kafka.image.TopicsImage; import org.openjdk.jmh.annotations.Benchmark; @@ -48,10 +44,8 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -83,27 +77,30 @@ public class TargetAssignmentBuilderBenchmark { private PartitionAssignor partitionAssignor; - private Map subscriptionMetadata = Collections.emptyMap(); - private TargetAssignmentBuilder targetAssignmentBuilder; - private GroupSpecImpl groupSpec; + private GroupSpec groupSpec; private Map> invertedTargetAssignment; - private final List allTopicNames = new ArrayList<>(); + private List allTopicNames = Collections.emptyList(); + + private Map subscriptionMetadata = Collections.emptyMap(); private TopicsImage topicsImage; + private SubscribedTopicDescriber subscribedTopicDescriber; + @Setup(Level.Trial) public void setup() { // For this benchmark we will use the Uniform Assignor // and a group that has a homogeneous subscription model. partitionAssignor = new UniformAssignor(); - subscriptionMetadata = generateMockSubscriptionMetadata(); - Map members = generateMockMembers(); - Map existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(); + setupTopics(); + + Map members = createMembers(); + Map existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(members); ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember") .setSubscribedTopicNames(allTopicNames) @@ -119,62 +116,34 @@ public class TargetAssignmentBuilderBenchmark { .addOrUpdateMember(newMember.memberId(), newMember); } - private Map generateMockMembers() { - Map members = new HashMap<>(); + private void setupTopics() { + allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount); - for (int i = 0; i < memberCount - 1; i++) { - ConsumerGroupMember member = new ConsumerGroupMember.Builder("member" + i) - .setSubscribedTopicNames(allTopicNames) - .build(); - members.put("member" + i, member); - } - return members; - } - - private Map generateMockSubscriptionMetadata() { - Map subscriptionMetadata = new HashMap<>(); - MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); - int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount; - - for (int i = 0; i < topicCount; i++) { - String topicName = "topic-" + i; - Uuid topicId = Uuid.randomUuid(); - allTopicNames.add(topicName); - - TopicMetadata metadata = new TopicMetadata( - topicId, - topicName, - partitionsPerTopicCount, - Collections.emptyMap() - ); - subscriptionMetadata.put(topicName, metadata); - - AssignorBenchmarkUtils.addTopic( - delta, - topicId, - topicName, - partitionsPerTopicCount - ); - } - - topicsImage = delta.apply(MetadataProvenance.EMPTY).topics(); - return subscriptionMetadata; - } - - private Map generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment() { - Map topicMetadataMap = new HashMap<>(topicCount); - subscriptionMetadata.forEach((topicName, topicMetadata) -> - topicMetadataMap.put( - topicMetadata.id(), - topicMetadata - ) + int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount; + subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata( + allTopicNames, + partitionsPerTopic, + topicName -> Collections.emptyMap() ); - createAssignmentSpec(); + topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata); + + Map topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata); + subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata); + } + + private Map generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment( + Map members + ) { + this.groupSpec = AssignorBenchmarkUtils.createGroupSpec( + members, + HOMOGENEOUS, + topicsImage + ); GroupAssignment groupAssignment = partitionAssignor.assign( groupSpec, - new SubscribedTopicDescriberImpl(topicMetadataMap) + subscribedTopicDescriber ); invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment); @@ -189,26 +158,23 @@ public class TargetAssignmentBuilderBenchmark { return initialTargetAssignment; } - private void createAssignmentSpec() { - Map members = new HashMap<>(); - - for (int i = 0; i < memberCount - 1; i++) { - String memberId = "member" + i; - - members.put(memberId, new MemberSubscriptionAndAssignmentImpl( - Optional.empty(), - Optional.empty(), - new TopicIds(new HashSet<>(allTopicNames), topicsImage), - Assignment.EMPTY - )); - } - groupSpec = new GroupSpecImpl( - members, - HOMOGENEOUS, - Collections.emptyMap() + private Map createMembers() { + return AssignorBenchmarkUtils.createHomogeneousMembers( + memberCount - 1, + this::memberId, + this::rackId, + allTopicNames ); } + private String memberId(int memberIndex) { + return "member" + memberIndex; + } + + private Optional rackId(int memberIndex) { + return Optional.empty(); + } + @Benchmark @Threads(1) @OutputTimeUnit(TimeUnit.MILLISECONDS)