KAFKA-19478 [2/N]: Remove task pairs (#20127)
CI / build (push) Waiting to run Details

Task pairs is an optimization that is enabled in the current sticky task
assignor.

The basic idea is that every time we add a task A to a client that has
tasks B, C, we add pairs (A, B) and (A, C) to a global collection of
pairs. When adding a standby task, we then prioritize creating standby
tasks that create new task pairs. If this does not work, we fall back to
the original behavior.

The complexity of this optimization is fairly significant, and its
usefulness is questionable, the HighAvailabilityAssignor does not seem
to have such an optimization, and the absence of this optimization does
not seem to have caused any problems that I know of. I could not find
any what this optimization is actually trying to achieve.

A side effect of it is that we will sometimes avoid “small loops”, such
as

        Node A: ActiveTask1, StandbyTask2 Node B: ActiveTask2,
StandbyTask1                            Node C: ActiveTask3,
StandbyTask4                            Node D: ActiveTask4,
StandbyTask3

So a small loop like this, worst case losing two nodes will cause 2
tasks to go down, so the assignor is preferring

        Node A: ActiveTask1, StandbyTask4 Node B: ActiveTask2,
StandbyTask1                            Node C: ActiveTask3,
StandbyTask2                            Node D: ActiveTask4,
StandbyTask3

Which is a “big loop” assignment, where worst-case losing two nodes will
cause at most 1 task to be unavailable. However, this optimization seems
fairly niche, and also the current implementation does not seem to
implement it in a direct form, but a more relaxed constraint which
usually, does not always avoid small loops. So it remains unclear
whether  this is really the intention behind the optimization. The
current unit  tests of the StickyTaskAssignor pass even after removing
the  optimization.

The pairs optimization has a worst-case quadratic space and time
complexity in the number of tasks, and make a lot of other optimizations
impossible, so I’d suggest we remove it. I don’t think, in its current
form, it is suitable to be implemented in a broker-side assignor. Note,
however, if we identify a useful effect of the code in the future, we
can work on finding an efficient algorithm that can bring the
optimization to our broker-side assignor.

This reduces the runtime of our worst case benchmark by 10x.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-07-14 09:13:51 +02:00 committed by GitHub
parent b6fce13e3a
commit 29cf97b9ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 7 additions and 183 deletions

View File

@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -91,8 +90,6 @@ public class StickyTaskAssignor implements TaskAssignor {
localState.totalCapacity = groupSpec.members().size();
localState.tasksPerMember = computeTasksPerMember(localState.allTasks, localState.totalCapacity);
localState.taskPairs = new TaskPairs(localState.allTasks * (localState.allTasks - 1) / 2);
localState.processIdToState = new HashMap<>();
localState.activeTaskToPrevMember = new HashMap<>();
localState.standbyTaskToPrevMember = new HashMap<>();
@ -175,7 +172,7 @@ public class StickyTaskAssignor implements TaskAssignor {
final Member prevMember = localState.activeTaskToPrevMember.get(task);
if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true);
updateHelpers(prevMember, task, true);
updateHelpers(prevMember, true);
it.remove();
}
}
@ -187,7 +184,7 @@ public class StickyTaskAssignor implements TaskAssignor {
final Member prevMember = findMemberWithLeastLoad(prevMembers, task, true);
if (prevMember != null && hasUnfulfilledQuota(prevMember)) {
localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true);
updateHelpers(prevMember, task, true);
updateHelpers(prevMember, true);
it.remove();
}
}
@ -204,7 +201,7 @@ public class StickyTaskAssignor implements TaskAssignor {
}
localState.processIdToState.get(member.processId).addTask(member.memberId, task, true);
it.remove();
updateHelpers(member, task, true);
updateHelpers(member, true);
}
}
@ -221,20 +218,10 @@ public class StickyTaskAssignor implements TaskAssignor {
if (members == null || members.isEmpty()) {
return null;
}
Set<Member> rightPairs = members.stream()
.filter(member -> localState.taskPairs.hasNewPair(taskId, localState.processIdToState.get(member.processId).assignedTasks()))
.collect(Collectors.toSet());
if (rightPairs.isEmpty()) {
rightPairs = members;
}
Optional<ProcessState> processWithLeastLoad = rightPairs.stream()
Optional<ProcessState> processWithLeastLoad = members.stream()
.map(member -> localState.processIdToState.get(member.processId))
.min(Comparator.comparingDouble(ProcessState::load));
// processWithLeastLoad must be present at this point, but we do a double check
if (processWithLeastLoad.isEmpty()) {
return null;
}
// if the same exact former member is needed
if (returnSameMember) {
return localState.standbyTaskToPrevMember.get(taskId).stream()
@ -275,8 +262,7 @@ public class StickyTaskAssignor implements TaskAssignor {
// prev active task
Member prevMember = localState.activeTaskToPrevMember.get(task);
if (prevMember != null && availableProcesses.contains(prevMember.processId) && isLoadBalanced(prevMember.processId)
&& localState.taskPairs.hasNewPair(task, localState.processIdToState.get(prevMember.processId).assignedTasks())) {
if (prevMember != null && availableProcesses.contains(prevMember.processId) && isLoadBalanced(prevMember.processId)) {
standby = prevMember;
}
@ -304,7 +290,7 @@ public class StickyTaskAssignor implements TaskAssignor {
}
}
localState.processIdToState.get(standby.processId).addTask(standby.memberId, task, false);
updateHelpers(standby, task, false);
updateHelpers(standby, false);
}
}
@ -323,10 +309,7 @@ public class StickyTaskAssignor implements TaskAssignor {
return process.hasCapacity() || isLeastLoadedProcess;
}
private void updateHelpers(final Member member, final TaskId taskId, final boolean isActive) {
// add all pair combinations: update taskPairs
localState.taskPairs.addPairs(taskId, localState.processIdToState.get(member.processId).assignedTasks());
private void updateHelpers(final Member member, final boolean isActive) {
if (isActive) {
// update task per process
maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).activeTaskCount());
@ -344,75 +327,6 @@ public class StickyTaskAssignor implements TaskAssignor {
return tasksPerMember;
}
private static class TaskPairs {
private final Set<Pair> pairs;
private final int maxPairs;
TaskPairs(final int maxPairs) {
this.maxPairs = maxPairs;
this.pairs = new HashSet<>(maxPairs);
}
boolean hasNewPair(final TaskId task1,
final Set<TaskId> taskIds) {
if (pairs.size() == maxPairs) {
return false;
}
if (taskIds.size() == 0) {
return true;
}
for (final TaskId taskId : taskIds) {
if (!pairs.contains(pair(task1, taskId))) {
return true;
}
}
return false;
}
void addPairs(final TaskId taskId, final Set<TaskId> assigned) {
for (final TaskId id : assigned) {
if (!id.equals(taskId))
pairs.add(pair(id, taskId));
}
}
Pair pair(final TaskId task1, final TaskId task2) {
if (task1.compareTo(task2) < 0) {
return new Pair(task1, task2);
}
return new Pair(task2, task1);
}
private static class Pair {
private final TaskId task1;
private final TaskId task2;
Pair(final TaskId task1, final TaskId task2) {
this.task1 = task1;
this.task2 = task2;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Pair pair = (Pair) o;
return Objects.equals(task1, pair.task1) &&
Objects.equals(task2, pair.task2);
}
@Override
public int hashCode() {
return Objects.hash(task1, task2);
}
}
}
static class Member {
private final String processId;
private final String memberId;
@ -425,7 +339,6 @@ public class StickyTaskAssignor implements TaskAssignor {
private static class LocalState {
// helper data structures:
private TaskPairs taskPairs;
Map<TaskId, Member> activeTaskToPrevMember;
Map<TaskId, Set<Member>> standbyTaskToPrevMember;
Map<String, ProcessState> processIdToState;

View File

@ -32,12 +32,10 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -581,86 +579,6 @@ public class StickyTaskAssignorTest {
assertEquals(4, getAllActiveTaskCount(result, "member1"));
}
@Test
public void shouldNotHaveSameAssignmentOnAnyTwoHosts() {
final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1");
final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2");
final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3");
final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4");
final List<String> allMemberIds = asList("member1", "member2", "member3", "member4");
Map<String, AssignmentMemberSpec> members = mkMap(
mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4));
final GroupAssignment result = assignor.assign(
new GroupSpecImpl(members,
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
);
for (final String memberId : allMemberIds) {
final List<Integer> taskIds = getAllTaskIds(result, memberId);
for (final String otherMemberId : allMemberIds) {
if (!memberId.equals(otherMemberId)) {
assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId));
}
}
}
}
@Test
public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() {
final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2))), Map.of());
final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Sets.newSet(3))), Map.of());
final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Sets.newSet(0))), Map.of());
final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4");
final List<String> allMemberIds = asList("member1", "member2", "member3", "member4");
Map<String, AssignmentMemberSpec> members = mkMap(
mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4));
final GroupAssignment result = assignor.assign(
new GroupSpecImpl(members,
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
);
for (final String memberId : allMemberIds) {
final List<Integer> taskIds = getAllTaskIds(result, memberId);
for (final String otherMemberId : allMemberIds) {
if (!memberId.equals(otherMemberId)) {
assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId));
}
}
}
}
@Test
public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() {
final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1",
mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2))), mkMap(mkEntry("test-subtopology", Sets.newSet(3, 0))));
final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2",
mkMap(mkEntry("test-subtopology", Sets.newSet(3, 0))), mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2))));
final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3");
final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4");
final List<String> allMemberIds = asList("member1", "member2", "member3", "member4");
Map<String, AssignmentMemberSpec> members = mkMap(
mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4));
final GroupAssignment result = assignor.assign(
new GroupSpecImpl(members,
mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
);
for (final String memberId : allMemberIds) {
final List<Integer> taskIds = getAllTaskIds(result, memberId);
for (final String otherMemberId : allMemberIds) {
if (!memberId.equals(otherMemberId)) {
assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId));
}
}
}
}
@Test
public void shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() {
final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1, 2, 3))), Map.of());
@ -1020,13 +938,6 @@ public class StickyTaskAssignorTest {
return res;
}
private List<Integer> getAllTaskIds(GroupAssignment result, String... memberIds) {
List<Integer> res = new ArrayList<>();
res.addAll(getAllActiveTaskIds(result, memberIds));
res.addAll(getAllStandbyTaskIds(result, memberIds));
return res;
}
private Map<String, Set<Integer>> mergeAllStandbyTasks(GroupAssignment result, String... memberIds) {
Map<String, Set<Integer>> res = new HashMap<>();
for (String memberId : memberIds) {