KAFKA-19477: Sticky Assignor JMH Benchmark (#20118)
CI / build (push) Waiting to run Details

The current assignor used in KIP-1071 is verbatim the assignor used on
the client-side. The assignor performance was not a big concern on the
client-side, and it seems some additional performance overhead has crept
in during the adaptation to the broker-side interfaces, so we expect it
to be too slow for groups of non-trivial size.

We base ourselves on the share-group parameters for these benchmarks:

 - Up to 1000 members      - Up to 100 topics      - Up to 100
partitions per topic

Note, however, that the parameters influencing the Streams assignment
are different and more complicated compared to regular consumer groups /
share consumer groups. The assignment logic is independent of the number
of topics, but depends on the number of subtopologies. A subtopology may
read from multiple topics. We simplify this relationship by assuming one
topic per subtopology Members may be part of the same process or
separate processes. We introduce a parameter membersPerProcess to tune
two extreme configurations (1, 50).

We define 50% of the subtopologies to be stateful. Stateful
subtopologies get standby replicas assigned, if enabled. For example, if
we have 100 subtopologies with 100 partitions, we get 10,000 active
tasks and 5,000 standby tasks. 

Reviewers: Bill Bejeck <bbejeck@apache.org>
This commit is contained in:
Lucas Brutschy 2025-07-09 13:58:03 +02:00 committed by GitHub
parent e42e01eec3
commit dabde76ebf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 316 additions and 0 deletions

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.streams.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredInternalTopic;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
public class StreamsAssignorBenchmarkUtils {
/**
* Creates a GroupSpec from the given StreamsGroupMembers.
*
* @param members The StreamsGroupMembers.
* @param assignmentConfigs The assignment configs.
*
* @return The new GroupSpec.
*/
public static GroupSpec createGroupSpec(
Map<String, StreamsGroupMember> members,
Map<String, String> assignmentConfigs
) {
Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
// Prepare the member spec for all members.
for (Map.Entry<String, StreamsGroupMember> memberEntry : members.entrySet()) {
String memberId = memberEntry.getKey();
StreamsGroupMember member = memberEntry.getValue();
memberSpecs.put(memberId, new AssignmentMemberSpec(
member.instanceId(),
member.rackId(),
member.assignedTasks().activeTasks(),
member.assignedTasks().standbyTasks(),
member.assignedTasks().warmupTasks(),
member.processId(),
member.clientTags(),
Map.of(),
Map.of()
));
}
return new GroupSpecImpl(
memberSpecs,
assignmentConfigs
);
}
/**
* Creates a StreamsGroupMembers map where all members have the same topic subscriptions.
*
* @param memberCount The number of members in the group.
* @param membersPerProcess The number of members per process.
*
* @return The new StreamsGroupMembers map.
*/
public static Map<String, StreamsGroupMember> createStreamsMembers(
int memberCount,
int membersPerProcess
) {
Map<String, StreamsGroupMember> members = new HashMap<>();
for (int i = 0; i < memberCount; i++) {
String memberId = "member-" + i;
String processId = "process-" + i / membersPerProcess;
members.put(memberId, StreamsGroupMember.Builder.withDefaults(memberId)
.setProcessId(processId)
.build());
}
return members;
}
/**
* Creates a subtopology map with the given number of partitions per topic and a list of topic names.
* For simplicity, each subtopology is associated with a single topic, and every second subtopology
* is stateful (i.e., has a changelog topic).
*
* The number of topics a subtopology is associated with is irrelevant, and
* so is the number of changelog topics.
*
* @param partitionsPerTopic The number of partitions per topic, implies the number of tasks for the subtopology.
* @param allTopicNames All topics names.
* @return A sorted map of subtopology IDs to ConfiguredSubtopology objects.
*/
public static SortedMap<String, ConfiguredSubtopology> createSubtopologyMap(
int partitionsPerTopic,
List<String> allTopicNames
) {
TreeMap<String, ConfiguredSubtopology> subtopologyMap = new TreeMap<>();
for (int i = 0; i < allTopicNames.size(); i++) {
String topicName = allTopicNames.get(i);
if (i % 2 == 0) {
subtopologyMap.put(topicName + "_subtopology", new ConfiguredSubtopology(partitionsPerTopic, Set.of(topicName), Map.of(), Set.of(), Map.of(
topicName + "_changelog", new ConfiguredInternalTopic(
topicName + "_changelog",
partitionsPerTopic,
Optional.empty(),
Map.of()
)
)));
} else {
subtopologyMap.put(topicName + "_subtopology", new ConfiguredSubtopology(partitionsPerTopic, Set.of(topicName), Map.of(), Set.of(), Map.of()));
}
}
return subtopologyMap;
}
}

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.assignor;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.TopologyMetadata;
import org.apache.kafka.coordinator.group.streams.assignor.AssignmentMemberSpec;
import org.apache.kafka.coordinator.group.streams.assignor.GroupAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.GroupSpec;
import org.apache.kafka.coordinator.group.streams.assignor.GroupSpecImpl;
import org.apache.kafka.coordinator.group.streams.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TopologyDescriber;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.image.MetadataImage;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class StreamsStickyAssignorBenchmark {
/**
* The assignment type is decided based on whether all the members are assigned partitions
* for the first time (full), or incrementally when a rebalance is triggered.
*/
public enum AssignmentType {
FULL, INCREMENTAL
}
@Param({"100", "1000"})
private int memberCount;
@Param({"10", "100"})
private int partitionCount;
@Param({"10", "100"})
private int subtopologyCount;
@Param({"0", "1"})
private int standbyReplicas;
@Param({"1", "50"})
private int membersPerProcess;
@Param({"FULL", "INCREMENTAL"})
private AssignmentType assignmentType;
private TaskAssignor taskAssignor;
private GroupSpec groupSpec;
private TopologyDescriber topologyDescriber;
private Map<String, String> assignmentConfigs;
@Setup(Level.Trial)
public void setup() {
List<String> allTopicNames = AssignorBenchmarkUtils.createTopicNames(subtopologyCount);
SortedMap<String, ConfiguredSubtopology> subtopologyMap = StreamsAssignorBenchmarkUtils.createSubtopologyMap(partitionCount, allTopicNames);
MetadataImage metadataImage = AssignorBenchmarkUtils.createMetadataImage(allTopicNames, partitionCount);
topologyDescriber = new TopologyMetadata(metadataImage, subtopologyMap);
taskAssignor = new StickyTaskAssignor();
Map<String, StreamsGroupMember> members = createMembers();
this.assignmentConfigs = Collections.singletonMap(
"num.standby.replicas",
Integer.toString(standbyReplicas)
);
this.groupSpec = StreamsAssignorBenchmarkUtils.createGroupSpec(members, assignmentConfigs);
if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance();
}
}
private Map<String, StreamsGroupMember> createMembers() {
// In the rebalance case, we will add the last member as a trigger.
// This is done to keep the total members count consistent with the input.
int numberOfMembers = assignmentType.equals(AssignmentType.INCREMENTAL) ? memberCount - 1 : memberCount;
return StreamsAssignorBenchmarkUtils.createStreamsMembers(
numberOfMembers,
membersPerProcess
);
}
private void simulateIncrementalRebalance() {
GroupAssignment initialAssignment = new StickyTaskAssignor().assign(groupSpec, topologyDescriber);
Map<String, MemberAssignment> members = initialAssignment.members();
Map<String, AssignmentMemberSpec> updatedMemberSpec = new HashMap<>();
for (Map.Entry<String, AssignmentMemberSpec> member : groupSpec.members().entrySet()) {
MemberAssignment memberAssignment = members.getOrDefault(
member.getKey(),
new MemberAssignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())
);
updatedMemberSpec.put(member.getKey(), new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
memberAssignment.activeTasks(),
memberAssignment.standbyTasks(),
memberAssignment.warmupTasks(),
member.getValue().processId(),
Map.of(),
Map.of(),
Map.of()
));
}
updatedMemberSpec.put("newMember", new AssignmentMemberSpec(
Optional.empty(),
Optional.empty(),
Map.of(),
Map.of(),
Map.of(),
"process-newMember",
Map.of(),
Map.of(),
Map.of()
));
groupSpec = new GroupSpecImpl(
updatedMemberSpec,
assignmentConfigs
);
}
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void doAssignment(Blackhole blackhole) {
blackhole.consume(taskAssignor.assign(groupSpec, topologyDescriber));
}
}