KAFKA-17495: Factor out assignor benchmark code into utils class (#17133)

ServerSideAssignorBenchmark and TargetAssignmentBuilderBenchmark have
the same topic and member subscription setup for the most part. Factor
out the commonality so that it's easier to share new setups between both
benchmarks.

Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
Sean Quah 2024-09-23 15:55:54 +01:00 committed by GitHub
parent a407cc3a31
commit 9352faa8fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 316 additions and 173 deletions

View File

@ -20,13 +20,30 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicsImage;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.function.Function;
public class AssignorBenchmarkUtils { public class AssignorBenchmarkUtils {
/** /**
@ -57,6 +74,215 @@ public class AssignorBenchmarkUtils {
return invertedTargetAssignment; return invertedTargetAssignment;
} }
/**
* Generates a list of topic names for use in benchmarks.
*
* @param topicCount The number of topic names to generate.
* @return The list of topic names.
*/
public static List<String> createTopicNames(int topicCount) {
List<String> topicNames = new ArrayList<>();
for (int i = 0; i < topicCount; i++) {
topicNames.add("topic-" + i);
}
return topicNames;
}
/**
* Creates a subscription metadata map for the given topics.
*
* @param topicNames The names of the topics.
* @param partitionsPerTopic The number of partitions per topic.
* @param getTopicPartitionRacks A function to get the racks map for each topic. May return
* an empty map if no rack info is desired.
* @return The subscription metadata map.
*/
public static Map<String, TopicMetadata> createSubscriptionMetadata(
List<String> topicNames,
int partitionsPerTopic,
Function<String, Map<Integer, Set<String>>> getTopicPartitionRacks
) {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
for (String topicName : topicNames) {
Uuid topicId = Uuid.randomUuid();
TopicMetadata metadata = new TopicMetadata(
topicId,
topicName,
partitionsPerTopic,
getTopicPartitionRacks.apply(topicName)
);
subscriptionMetadata.put(topicName, metadata);
}
return subscriptionMetadata;
}
/**
* Creates a topic metadata map from the given subscription metadata.
*
* @param subscriptionMetadata The subscription metadata.
* @return The topic metadata map.
*/
public static Map<Uuid, TopicMetadata> createTopicMetadata(
Map<String, TopicMetadata> subscriptionMetadata
) {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>((int) (subscriptionMetadata.size() / 0.75f + 1));
for (Map.Entry<String, TopicMetadata> entry : subscriptionMetadata.entrySet()) {
topicMetadata.put(entry.getValue().id(), entry.getValue());
}
return topicMetadata;
}
/**
* Creates a TopicsImage from the given subscription metadata.
*
* @param subscriptionMetadata The subscription metadata.
* @return A TopicsImage containing the topic ids, names and partition counts from the
* subscription metadata.
*/
public static TopicsImage createTopicsImage(Map<String, TopicMetadata> subscriptionMetadata) {
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
for (Map.Entry<String, TopicMetadata> entry : subscriptionMetadata.entrySet()) {
TopicMetadata topicMetadata = entry.getValue();
AssignorBenchmarkUtils.addTopic(
delta,
topicMetadata.id(),
topicMetadata.name(),
topicMetadata.numPartitions()
);
}
return delta.apply(MetadataProvenance.EMPTY).topics();
}
/**
* Creates a GroupSpec from the given ConsumerGroupMembers.
*
* @param members The ConsumerGroupMembers.
* @param subscriptionType  The group's subscription type.
* @param topicsImage The TopicsImage to use.
* @return The new GroupSpec.
*/
public static GroupSpec createGroupSpec(
Map<String, ConsumerGroupMember> members,
SubscriptionType subscriptionType,
TopicsImage topicsImage
) {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();
// Prepare the member spec for all members.
for (Map.Entry<String, ConsumerGroupMember> memberEntry : members.entrySet()) {
String memberId = memberEntry.getKey();
ConsumerGroupMember member = memberEntry.getValue();
memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.ofNullable(member.instanceId()),
new TopicIds(member.subscribedTopicNames(), topicsImage),
new Assignment(member.assignedPartitions())
));
}
return new GroupSpecImpl(
memberSpecs,
subscriptionType,
Collections.emptyMap()
);
}
/**
* Creates a ConsumerGroupMembers map where all members have the same topic subscriptions.
*
* @param memberCount The number of members in the group.
* @param getMemberId A function to map member indices to member ids.
* @param getMemberRackId A function to map member indices to rack ids.
* @param topicNames The topics to subscribe to.
* @return The new ConsumerGroupMembers map.
*/
public static Map<String, ConsumerGroupMember> createHomogeneousMembers(
int memberCount,
Function<Integer, String> getMemberId,
Function<Integer, Optional<String>> getMemberRackId,
List<String> topicNames
) {
Map<String, ConsumerGroupMember> members = new HashMap<>();
for (int i = 0; i < memberCount; i++) {
String memberId = getMemberId.apply(i);
Optional<String> rackId = getMemberRackId.apply(i);
members.put(memberId, new ConsumerGroupMember.Builder("member" + i)
.setRackId(rackId.orElse(null))
.setSubscribedTopicNames(topicNames)
.build()
);
}
return members;
}
/**
* Creates a ConsumerGroupMembers map where members have different topic subscriptions.
*
* Divides members and topics into a given number of buckets. Within each bucket, members are
* subscribed to the same topics.
*
* @param memberCount The number of members in the group.
* @param bucketCount The number of buckets.
* @param getMemberId A function to map member indices to member ids.
* @param getMemberRackId A function to map member indices to rack ids.
* @param topicNames The topics to subscribe to.
* @return The new ConsumerGroupMembers map.
*/
public static Map<String, ConsumerGroupMember> createHeterogeneousBucketedMembers(
int memberCount,
int bucketCount,
Function<Integer, String> getMemberId,
Function<Integer, Optional<String>> getMemberRackId,
List<String> topicNames
) {
Map<String, ConsumerGroupMember> members = new HashMap<>();
// Adjust bucket count based on member count when member count < max bucket count.
bucketCount = Math.min(bucketCount, memberCount);
// Check minimum topics requirement
if (topicNames.size() < bucketCount) {
throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing.");
}
int bucketSizeTopics = (int) Math.ceil((double) topicNames.size() / bucketCount);
int bucketSizeMembers = (int) Math.ceil((double) memberCount / bucketCount);
// Define buckets for each member and assign topics from the same bucket
for (int bucket = 0; bucket < bucketCount; bucket++) {
int memberStartIndex = bucket * bucketSizeMembers;
int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, memberCount);
int topicStartIndex = bucket * bucketSizeTopics;
int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicNames.size());
List<String> bucketTopicNames = topicNames.subList(topicStartIndex, topicEndIndex);
// Assign topics to each member in the current bucket
for (int i = memberStartIndex; i < memberEndIndex; i++) {
String memberId = getMemberId.apply(i);
Optional<String> rackId = getMemberRackId.apply(i);
members.put(memberId, new ConsumerGroupMember.Builder("member" + i)
.setRackId(rackId.orElse(null))
.setSubscribedTopicNames(bucketTopicNames)
.build()
);
}
}
return members;
}
public static void addTopic( public static void addTopic(
MetadataDelta delta, MetadataDelta delta,
Uuid topicId, Uuid topicId,

View File

@ -18,6 +18,7 @@ package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
@ -31,9 +32,7 @@ import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignment
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TopicIds; import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicsImage; import org.apache.kafka.image.TopicsImage;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
@ -50,7 +49,6 @@ import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -122,127 +120,80 @@ public class ServerSideAssignorBenchmark {
private static final int MAX_BUCKET_COUNT = 5; private static final int MAX_BUCKET_COUNT = 5;
private GroupSpecImpl groupSpec; private GroupSpec groupSpec;
private SubscribedTopicDescriber subscribedTopicDescriber; private List<String> allTopicNames = Collections.emptyList();
private final List<String> allTopicNames = new ArrayList<>(); private Map<String, TopicMetadata> subscriptionMetadata = Collections.emptyMap();
private TopicsImage topicsImage = TopicsImage.EMPTY; private TopicsImage topicsImage = TopicsImage.EMPTY;
private SubscribedTopicDescriber subscribedTopicDescriber;
@Setup(Level.Trial) @Setup(Level.Trial)
public void setup() { public void setup() {
Map<Uuid, TopicMetadata> topicMetadata = createTopicMetadata();
subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata);
createGroupSpec();
partitionAssignor = assignorType.assignor(); partitionAssignor = assignorType.assignor();
setupTopics();
Map<String, ConsumerGroupMember> members = createMembers();
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicsImage);
if (assignmentType == AssignmentType.INCREMENTAL) { if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance(); simulateIncrementalRebalance();
} }
} }
private Map<Uuid, TopicMetadata> createTopicMetadata() { private void setupTopics() {
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount);
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount;
int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount;
Map<Integer, Set<String>> partitionRacks = isRackAware ? Map<Integer, Set<String>> partitionRacks = isRackAware ?
mkMapOfPartitionRacks(partitionsPerTopicCount) : mkMapOfPartitionRacks(partitionsPerTopic) :
Collections.emptyMap(); Collections.emptyMap();
subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata(
allTopicNames,
partitionsPerTopic,
topicName -> partitionRacks
);
for (int i = 0; i < topicCount; i++) { topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
Uuid topicUuid = Uuid.randomUuid();
String topicName = "topic" + i;
allTopicNames.add(topicName);
topicMetadata.put(topicUuid, new TopicMetadata(
topicUuid,
topicName,
partitionsPerTopicCount,
partitionRacks
));
AssignorBenchmarkUtils.addTopic( Map<Uuid, TopicMetadata> topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata);
delta, subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata);
topicUuid,
topicName,
partitionsPerTopicCount
);
}
topicsImage = delta.apply(MetadataProvenance.EMPTY).topics();
return topicMetadata;
} }
private void createGroupSpec() { private Map<String, ConsumerGroupMember> createMembers() {
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>();
// In the rebalance case, we will add the last member as a trigger. // In the rebalance case, we will add the last member as a trigger.
// This is done to keep the total members count consistent with the input. // This is done to keep the total members count consistent with the input.
int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount; int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount;
if (subscriptionType == HOMOGENEOUS) { if (subscriptionType == HOMOGENEOUS) {
for (int i = 0; i < numberOfMembers; i++) { return AssignorBenchmarkUtils.createHomogeneousMembers(
addMemberSpec(members, i, new TopicIds(new HashSet<>(allTopicNames), topicsImage)); numberOfMembers,
} this::memberId,
this::rackId,
allTopicNames
);
} else { } else {
// Adjust bucket count based on member count when member count < max bucket count. return AssignorBenchmarkUtils.createHeterogeneousBucketedMembers(
int bucketCount = Math.min(MAX_BUCKET_COUNT, numberOfMembers); numberOfMembers,
MAX_BUCKET_COUNT,
// Check minimum topics requirement this::memberId,
if (topicCount < bucketCount) { this::rackId,
throw new IllegalArgumentException("At least " + bucketCount + " topics are recommended for effective bucketing."); allTopicNames
} );
int bucketSizeTopics = (int) Math.ceil((double) topicCount / bucketCount);
int bucketSizeMembers = (int) Math.ceil((double) numberOfMembers / bucketCount);
// Define buckets for each member and assign topics from the same bucket
for (int bucket = 0; bucket < bucketCount; bucket++) {
int memberStartIndex = bucket * bucketSizeMembers;
int memberEndIndex = Math.min((bucket + 1) * bucketSizeMembers, numberOfMembers);
int topicStartIndex = bucket * bucketSizeTopics;
int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicCount);
TopicIds bucketTopics = new TopicIds(new HashSet<>(allTopicNames.subList(topicStartIndex, topicEndIndex)), topicsImage);
// Assign topics to each member in the current bucket
for (int i = memberStartIndex; i < memberEndIndex; i++) {
addMemberSpec(members, i, bucketTopics);
}
}
} }
}
this.groupSpec = new GroupSpecImpl( private String memberId(int memberIndex) {
members, return "member" + memberIndex;
subscriptionType,
Collections.emptyMap()
);
} }
private Optional<String> rackId(int memberIndex) { private Optional<String> rackId(int memberIndex) {
return isRackAware ? Optional.of("rack" + memberIndex % NUMBER_OF_RACKS) : Optional.empty(); return isRackAware ? Optional.of("rack" + memberIndex % NUMBER_OF_RACKS) : Optional.empty();
} }
private void addMemberSpec(
Map<String, MemberSubscriptionAndAssignmentImpl> members,
int memberIndex,
Set<Uuid> subscribedTopicIds
) {
String memberId = "member" + memberIndex;
Optional<String> rackId = rackId(memberIndex);
members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
rackId,
Optional.empty(),
subscribedTopicIds,
Assignment.EMPTY
));
}
private static Map<Integer, Set<String>> mkMapOfPartitionRacks(int numPartitions) { private static Map<Integer, Set<String>> mkMapOfPartitionRacks(int numPartitions) {
Map<Integer, Set<String>> partitionRacks = new HashMap<>(numPartitions); Map<Integer, Set<String>> partitionRacks = new HashMap<>(numPartitions);
for (int i = 0; i < numPartitions; i++) { for (int i = 0; i < numPartitions; i++) {
@ -279,7 +230,7 @@ public class ServerSideAssignorBenchmark {
Set<Uuid> subscribedTopicIdsForNewMember; Set<Uuid> subscribedTopicIdsForNewMember;
if (subscriptionType == HETEROGENEOUS) { if (subscriptionType == HETEROGENEOUS) {
subscribedTopicIdsForNewMember = updatedMemberSpec.get("member" + (memberCount - 2)).subscribedTopicIds(); subscribedTopicIdsForNewMember = updatedMemberSpec.get(memberId(memberCount - 2)).subscribedTopicIds();
} else { } else {
subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicsImage); subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicsImage);
} }

View File

@ -18,20 +18,16 @@ package org.apache.kafka.jmh.assignor;
import org.apache.kafka.common.Uuid; import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
import org.apache.kafka.coordinator.group.modern.Assignment; import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl; import org.apache.kafka.coordinator.group.modern.SubscribedTopicDescriberImpl;
import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder; import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.TopicMetadata; import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember; import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.TopicsImage; import org.apache.kafka.image.TopicsImage;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
@ -48,10 +44,8 @@ import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.annotations.Warmup;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -83,27 +77,30 @@ public class TargetAssignmentBuilderBenchmark {
private PartitionAssignor partitionAssignor; private PartitionAssignor partitionAssignor;
private Map<String, TopicMetadata> subscriptionMetadata = Collections.emptyMap();
private TargetAssignmentBuilder<ConsumerGroupMember> targetAssignmentBuilder; private TargetAssignmentBuilder<ConsumerGroupMember> targetAssignmentBuilder;
private GroupSpecImpl groupSpec; private GroupSpec groupSpec;
private Map<Uuid, Map<Integer, String>> invertedTargetAssignment; private Map<Uuid, Map<Integer, String>> invertedTargetAssignment;
private final List<String> allTopicNames = new ArrayList<>(); private List<String> allTopicNames = Collections.emptyList();
private Map<String, TopicMetadata> subscriptionMetadata = Collections.emptyMap();
private TopicsImage topicsImage; private TopicsImage topicsImage;
private SubscribedTopicDescriber subscribedTopicDescriber;
@Setup(Level.Trial) @Setup(Level.Trial)
public void setup() { public void setup() {
// For this benchmark we will use the Uniform Assignor // For this benchmark we will use the Uniform Assignor
// and a group that has a homogeneous subscription model. // and a group that has a homogeneous subscription model.
partitionAssignor = new UniformAssignor(); partitionAssignor = new UniformAssignor();
subscriptionMetadata = generateMockSubscriptionMetadata(); setupTopics();
Map<String, ConsumerGroupMember> members = generateMockMembers();
Map<String, Assignment> existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(); Map<String, ConsumerGroupMember> members = createMembers();
Map<String, Assignment> existingTargetAssignment = generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(members);
ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember") ConsumerGroupMember newMember = new ConsumerGroupMember.Builder("newMember")
.setSubscribedTopicNames(allTopicNames) .setSubscribedTopicNames(allTopicNames)
@ -119,62 +116,34 @@ public class TargetAssignmentBuilderBenchmark {
.addOrUpdateMember(newMember.memberId(), newMember); .addOrUpdateMember(newMember.memberId(), newMember);
} }
private Map<String, ConsumerGroupMember> generateMockMembers() { private void setupTopics() {
Map<String, ConsumerGroupMember> members = new HashMap<>(); allTopicNames = AssignorBenchmarkUtils.createTopicNames(topicCount);
for (int i = 0; i < memberCount - 1; i++) { int partitionsPerTopic = (memberCount * partitionsToMemberRatio) / topicCount;
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member" + i) subscriptionMetadata = AssignorBenchmarkUtils.createSubscriptionMetadata(
.setSubscribedTopicNames(allTopicNames) allTopicNames,
.build(); partitionsPerTopic,
members.put("member" + i, member); topicName -> Collections.emptyMap()
}
return members;
}
private Map<String, TopicMetadata> generateMockSubscriptionMetadata() {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount;
for (int i = 0; i < topicCount; i++) {
String topicName = "topic-" + i;
Uuid topicId = Uuid.randomUuid();
allTopicNames.add(topicName);
TopicMetadata metadata = new TopicMetadata(
topicId,
topicName,
partitionsPerTopicCount,
Collections.emptyMap()
);
subscriptionMetadata.put(topicName, metadata);
AssignorBenchmarkUtils.addTopic(
delta,
topicId,
topicName,
partitionsPerTopicCount
);
}
topicsImage = delta.apply(MetadataProvenance.EMPTY).topics();
return subscriptionMetadata;
}
private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment() {
Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>(topicCount);
subscriptionMetadata.forEach((topicName, topicMetadata) ->
topicMetadataMap.put(
topicMetadata.id(),
topicMetadata
)
); );
createAssignmentSpec(); topicsImage = AssignorBenchmarkUtils.createTopicsImage(subscriptionMetadata);
Map<Uuid, TopicMetadata> topicMetadata = AssignorBenchmarkUtils.createTopicMetadata(subscriptionMetadata);
subscribedTopicDescriber = new SubscribedTopicDescriberImpl(topicMetadata);
}
private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(
Map<String, ConsumerGroupMember> members
) {
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(
members,
HOMOGENEOUS,
topicsImage
);
GroupAssignment groupAssignment = partitionAssignor.assign( GroupAssignment groupAssignment = partitionAssignor.assign(
groupSpec, groupSpec,
new SubscribedTopicDescriberImpl(topicMetadataMap) subscribedTopicDescriber
); );
invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment); invertedTargetAssignment = AssignorBenchmarkUtils.computeInvertedTargetAssignment(groupAssignment);
@ -189,26 +158,23 @@ public class TargetAssignmentBuilderBenchmark {
return initialTargetAssignment; return initialTargetAssignment;
} }
private void createAssignmentSpec() { private Map<String, ConsumerGroupMember> createMembers() {
Map<String, MemberSubscriptionAndAssignmentImpl> members = new HashMap<>(); return AssignorBenchmarkUtils.createHomogeneousMembers(
memberCount - 1,
for (int i = 0; i < memberCount - 1; i++) { this::memberId,
String memberId = "member" + i; this::rackId,
allTopicNames
members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.empty(),
Optional.empty(),
new TopicIds(new HashSet<>(allTopicNames), topicsImage),
Assignment.EMPTY
));
}
groupSpec = new GroupSpecImpl(
members,
HOMOGENEOUS,
Collections.emptyMap()
); );
} }
private String memberId(int memberIndex) {
return "member" + memberIndex;
}
private Optional<String> rackId(int memberIndex) {
return Optional.empty();
}
@Benchmark @Benchmark
@Threads(1) @Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS) @OutputTimeUnit(TimeUnit.MILLISECONDS)