From dabde76ebf105aaa945db60b7753331c83a8c989 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Wed, 9 Jul 2025 13:58:03 +0200 Subject: [PATCH] KAFKA-19477: Sticky Assignor JMH Benchmark (#20118) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The current assignor used in KIP-1071 is verbatim the assignor used on the client-side. The assignor performance was not a big concern on the client-side, and it seems some additional performance overhead has crept in during the adaptation to the broker-side interfaces, so we expect it to be too slow for groups of non-trivial size. We base ourselves on the share-group parameters for these benchmarks: - Up to 1000 members - Up to 100 topics - Up to 100 partitions per topic Note, however, that the parameters influencing the Streams assignment are different and more complicated compared to regular consumer groups / share consumer groups. The assignment logic is independent of the number of topics, but depends on the number of subtopologies. A subtopology may read from multiple topics. We simplify this relationship by assuming one topic per subtopology Members may be part of the same process or separate processes. We introduce a parameter membersPerProcess to tune two extreme configurations (1, 50). We define 50% of the subtopologies to be stateful. Stateful subtopologies get standby replicas assigned, if enabled. For example, if we have 100 subtopologies with 100 partitions, we get 10,000 active tasks and 5,000 standby tasks.  Reviewers: Bill Bejeck --- .../StreamsAssignorBenchmarkUtils.java | 135 +++++++++++++ .../StreamsStickyAssignorBenchmark.java | 181 ++++++++++++++++++ 2 files changed, 316 insertions(+) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsAssignorBenchmarkUtils.java create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsAssignorBenchmarkUtils.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsAssignorBenchmarkUtils.java new file mode 100644 index 00000000000..910b81429b7 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsAssignorBenchmarkUtils.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; +import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.streams.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + + +public class StreamsAssignorBenchmarkUtils { + + /** + * Creates a GroupSpec from the given StreamsGroupMembers. + * + * @param members The StreamsGroupMembers. + * @param assignmentConfigs The assignment configs. + * + * @return The new GroupSpec. + */ + public static GroupSpec createGroupSpec( + Map members, + Map assignmentConfigs + ) { + Map memberSpecs = new HashMap<>(); + + // Prepare the member spec for all members. + for (Map.Entry memberEntry : members.entrySet()) { + String memberId = memberEntry.getKey(); + StreamsGroupMember member = memberEntry.getValue(); + + memberSpecs.put(memberId, new AssignmentMemberSpec( + member.instanceId(), + member.rackId(), + member.assignedTasks().activeTasks(), + member.assignedTasks().standbyTasks(), + member.assignedTasks().warmupTasks(), + member.processId(), + member.clientTags(), + Map.of(), + Map.of() + )); + } + + return new GroupSpecImpl( + memberSpecs, + assignmentConfigs + ); + } + + /** + * Creates a StreamsGroupMembers map where all members have the same topic subscriptions. + * + * @param memberCount The number of members in the group. + * @param membersPerProcess The number of members per process. + * + * @return The new StreamsGroupMembers map. + */ + public static Map createStreamsMembers( + int memberCount, + int membersPerProcess + ) { + Map members = new HashMap<>(); + + for (int i = 0; i < memberCount; i++) { + String memberId = "member-" + i; + String processId = "process-" + i / membersPerProcess; + + members.put(memberId, StreamsGroupMember.Builder.withDefaults(memberId) + .setProcessId(processId) + .build()); + } + + return members; + } + + /** + * Creates a subtopology map with the given number of partitions per topic and a list of topic names. + * For simplicity, each subtopology is associated with a single topic, and every second subtopology + * is stateful (i.e., has a changelog topic). + * + * The number of topics a subtopology is associated with is irrelevant, and + * so is the number of changelog topics. + * + * @param partitionsPerTopic The number of partitions per topic, implies the number of tasks for the subtopology. + * @param allTopicNames All topics names. + * @return A sorted map of subtopology IDs to ConfiguredSubtopology objects. + */ + public static SortedMap createSubtopologyMap( + int partitionsPerTopic, + List allTopicNames + ) { + TreeMap subtopologyMap = new TreeMap<>(); + for (int i = 0; i < allTopicNames.size(); i++) { + String topicName = allTopicNames.get(i); + if (i % 2 == 0) { + subtopologyMap.put(topicName + "_subtopology", new ConfiguredSubtopology(partitionsPerTopic, Set.of(topicName), Map.of(), Set.of(), Map.of( + topicName + "_changelog", new ConfiguredInternalTopic( + topicName + "_changelog", + partitionsPerTopic, + Optional.empty(), + Map.of() + ) + ))); + } else { + subtopologyMap.put(topicName + "_subtopology", new ConfiguredSubtopology(partitionsPerTopic, Set.of(topicName), Map.of(), Set.of(), Map.of())); + } + } + return subtopologyMap; + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java new file mode 100644 index 00000000000..605c6b89caf --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.coordinator.group.streams.StreamsGroupMember; +import org.apache.kafka.coordinator.group.streams.TopologyMetadata; +import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.streams.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl; +import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor; +import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor; +import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber; +import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology; +import org.apache.kafka.image.MetadataImage; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class StreamsStickyAssignorBenchmark { + + /** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ + public enum AssignmentType { + FULL, INCREMENTAL + } + + @Param({"100", "1000"}) + private int memberCount; + + @Param({"10", "100"}) + private int partitionCount; + + @Param({"10", "100"}) + private int subtopologyCount; + + @Param({"0", "1"}) + private int standbyReplicas; + + @Param({"1", "50"}) + private int membersPerProcess; + + @Param({"FULL", "INCREMENTAL"}) + private AssignmentType assignmentType; + + private TaskAssignor taskAssignor; + + private GroupSpec groupSpec; + + private TopologyDescriber topologyDescriber; + + private Map assignmentConfigs; + + @Setup(Level.Trial) + public void setup() { + List allTopicNames = AssignorBenchmarkUtils.createTopicNames(subtopologyCount); + + SortedMap subtopologyMap = StreamsAssignorBenchmarkUtils.createSubtopologyMap(partitionCount, allTopicNames); + + MetadataImage metadataImage = AssignorBenchmarkUtils.createMetadataImage(allTopicNames, partitionCount); + + topologyDescriber = new TopologyMetadata(metadataImage, subtopologyMap); + + taskAssignor = new StickyTaskAssignor(); + + Map members = createMembers(); + this.assignmentConfigs = Collections.singletonMap( + "num.standby.replicas", + Integer.toString(standbyReplicas) + ); + this.groupSpec = StreamsAssignorBenchmarkUtils.createGroupSpec(members, assignmentConfigs); + + if (assignmentType == AssignmentType.INCREMENTAL) { + simulateIncrementalRebalance(); + } + } + + private Map createMembers() { + // In the rebalance case, we will add the last member as a trigger. + // This is done to keep the total members count consistent with the input. + int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount; + + return StreamsAssignorBenchmarkUtils.createStreamsMembers( + numberOfMembers, + membersPerProcess + ); + } + + private void simulateIncrementalRebalance() { + GroupAssignment initialAssignment = new StickyTaskAssignor().assign(groupSpec, topologyDescriber); + Map members = initialAssignment.members(); + + Map updatedMemberSpec = new HashMap<>(); + + for (Map.Entry member : groupSpec.members().entrySet()) { + MemberAssignment memberAssignment = members.getOrDefault( + member.getKey(), + new MemberAssignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()) + ); + + updatedMemberSpec.put(member.getKey(), new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + memberAssignment.activeTasks(), + memberAssignment.standbyTasks(), + memberAssignment.warmupTasks(), + member.getValue().processId(), + Map.of(), + Map.of(), + Map.of() + )); + } + + updatedMemberSpec.put("newMember", new AssignmentMemberSpec( + Optional.empty(), + Optional.empty(), + Map.of(), + Map.of(), + Map.of(), + "process-newMember", + Map.of(), + Map.of(), + Map.of() + )); + + groupSpec = new GroupSpecImpl( + updatedMemberSpec, + assignmentConfigs + ); + } + + @Benchmark + @Threads(1) + @OutputTimeUnit(TimeUnit.MILLISECONDS) + public void doAssignment(Blackhole blackhole) { + blackhole.consume(taskAssignor.assign(groupSpec, topologyDescriber)); + } +}