diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsAssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsAssignorBenchmarkUtils.java new file mode 100644 index 00000000000..910b81429b7 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsAssignorBenchmarkUtils.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; +import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.streams.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + + +public class StreamsAssignorBenchmarkUtils { + + /** + * Creates a GroupSpec from the given StreamsGroupMembers. + * + * @param members The StreamsGroupMembers. + * @param assignmentConfigs The assignment configs. + * + * @return The new GroupSpec. + */ + public static GroupSpec createGroupSpec( + Map members, + Map assignmentConfigs + ) { + Map memberSpecs = new HashMap<>(); + + // Prepare the member spec for all members. + for (Map.Entry memberEntry : members.entrySet()) { + String memberId = memberEntry.getKey(); + StreamsGroupMember member = memberEntry.getValue(); + + memberSpecs.put(memberId, new AssignmentMemberSpec( + member.instanceId(), + member.rackId(), + member.assignedTasks().activeTasks(), + member.assignedTasks().standbyTasks(), + member.assignedTasks().warmupTasks(), + member.processId(), + member.clientTags(), + Map.of(), + Map.of() + )); + } + + return new GroupSpecImpl( + memberSpecs, + assignmentConfigs + ); + } + + /** + * Creates a StreamsGroupMembers map where all members have the same topic subscriptions. + * + * @param memberCount The number of members in the group. + * @param membersPerProcess The number of members per process. + * + * @return The new StreamsGroupMembers map. + */ + public static Map createStreamsMembers( + int memberCount, + int membersPerProcess + ) { + Map members = new HashMap<>(); + + for (int i = 0; i < memberCount; i++) { + String memberId = "member-" + i; + String processId = "process-" + i / membersPerProcess; + + members.put(memberId, StreamsGroupMember.Builder.withDefaults(memberId) + .setProcessId(processId) + .build()); + } + + return members; + } + + /** + * Creates a subtopology map with the given number of partitions per topic and a list of topic names. + * For simplicity, each subtopology is associated with a single topic, and every second subtopology + * is stateful (i.e., has a changelog topic). + * + * The number of topics a subtopology is associated with is irrelevant, and + * so is the number of changelog topics. + * + * @param partitionsPerTopic The number of partitions per topic, implies the number of tasks for the subtopology. + * @param allTopicNames All topics names. + * @return A sorted map of subtopology IDs to ConfiguredSubtopology objects. + */ + public static SortedMap createSubtopologyMap( + int partitionsPerTopic, + List allTopicNames + ) { + TreeMap subtopologyMap = new TreeMap<>(); + for (int i = 0; i < allTopicNames.size(); i++) { + String topicName = allTopicNames.get(i); + if (i % 2 == 0) { + subtopologyMap.put(topicName + "_subtopology", new ConfiguredSubtopology(partitionsPerTopic, Set.of(topicName), Map.of(), Set.of(), Map.of( + topicName + "_changelog", new ConfiguredInternalTopic( + topicName + "_changelog", + partitionsPerTopic, + Optional.empty(), + Map.of() + ) + ))); + } else { + subtopologyMap.put(topicName + "_subtopology", new ConfiguredSubtopology(partitionsPerTopic, Set.of(topicName), Map.of(), Set.of(), Map.of())); + } + } + return subtopologyMap; + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java new file mode 100644 index 00000000000..605c6b89caf --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; +import org.apache.kafka.coordinator.group.streams.TopologyMetadata; +import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.streams.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl; +import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor; +import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor; +import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; +import org.apache.kafka.image.MetadataImage; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class StreamsStickyAssignorBenchmark { + + /** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ + public enum AssignmentType { + FULL, INCREMENTAL + } + + @Param({"100", "1000"}) + private int memberCount; + + @Param({"10", "100"}) + private int partitionCount; + + @Param({"10", "100"}) + private int subtopologyCount; + + @Param({"0", "1"}) + private int standbyReplicas; + + @Param({"1", "50"}) + private int membersPerProcess; + + @Param({"FULL", "INCREMENTAL"}) + private AssignmentType assignmentType; + + private TaskAssignor taskAssignor; + + private GroupSpec groupSpec; + + private TopologyDescriber topologyDescriber; + + private Map assignmentConfigs; + + @Setup(Level.Trial) + public void setup() { + List allTopicNames = AssignorBenchmarkUtils.createTopicNames(subtopologyCount); + + SortedMap subtopologyMap = StreamsAssignorBenchmarkUtils.createSubtopologyMap(partitionCount, allTopicNames); + + MetadataImage metadataImage = AssignorBenchmarkUtils.createMetadataImage(allTopicNames, partitionCount); + + topologyDescriber = new TopologyMetadata(metadataImage, subtopologyMap); + + taskAssignor = new StickyTaskAssignor(); + + Map members = createMembers(); + this.assignmentConfigs = Collections.singletonMap( + "num.standby.replicas", + Integer.toString(standbyReplicas) + ); + this.groupSpec = StreamsAssignorBenchmarkUtils.createGroupSpec(members, assignmentConfigs); + + if (assignmentType == AssignmentType.INCREMENTAL) { + simulateIncrementalRebalance(); + } + } + + private Map createMembers() { + // In the rebalance case, we will add the last member as a trigger. + // This is done to keep the total members count consistent with the input. + int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount; + + return StreamsAssignorBenchmarkUtils.createStreamsMembers( + numberOfMembers, + membersPerProcess + ); + } + + private void simulateIncrementalRebalance() { + GroupAssignment initialAssignment = new StickyTaskAssignor().assign(groupSpec, topologyDescriber); + Map members = initialAssignment.members(); + + Map updatedMemberSpec = new HashMap<>(); + + for (Map.Entry member : groupSpec.members().entrySet()) { + MemberAssignment memberAssignment = members.getOrDefault( + member.getKey(), + new MemberAssignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()) + ); + + updatedMemberSpec.put(member.getKey(), new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + memberAssignment.activeTasks(), + memberAssignment.standbyTasks(), + memberAssignment.warmupTasks(), + member.getValue().processId(), + Map.of(), + Map.of(), + Map.of() + )); + } + + updatedMemberSpec.put("newMember", new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Map.of(), + Map.of(), + Map.of(), + "process-newMember", + Map.of(), + Map.of(), + Map.of() + )); + + groupSpec = new GroupSpecImpl( + updatedMemberSpec, + assignmentConfigs + ); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void doAssignment(Blackhole blackhole) { + blackhole.consume(taskAssignor.assign(groupSpec, topologyDescriber)); + } +}