From 29cf97b9ad75f01108e338b78a315d6019ad1953 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Mon, 14 Jul 2025 09:13:51 +0200 Subject: [PATCH] KAFKA-19478 [2/N]: Remove task pairs (#20127) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Task pairs is an optimization that is enabled in the current sticky task assignor. The basic idea is that every time we add a task A to a client that has tasks B, C, we add pairs (A, B) and (A, C) to a global collection of pairs. When adding a standby task, we then prioritize creating standby tasks that create new task pairs. If this does not work, we fall back to the original behavior. The complexity of this optimization is fairly significant, and its usefulness is questionable, the HighAvailabilityAssignor does not seem to have such an optimization, and the absence of this optimization does not seem to have caused any problems that I know of. I could not find any what this optimization is actually trying to achieve. A side effect of it is that we will sometimes avoid “small loops”, such as Node A: ActiveTask1, StandbyTask2 Node B: ActiveTask2, StandbyTask1 Node C: ActiveTask3, StandbyTask4 Node D: ActiveTask4, StandbyTask3 So a small loop like this, worst case losing two nodes will cause 2 tasks to go down, so the assignor is preferring Node A: ActiveTask1, StandbyTask4 Node B: ActiveTask2, StandbyTask1 Node C: ActiveTask3, StandbyTask2 Node D: ActiveTask4, StandbyTask3 Which is a “big loop” assignment, where worst-case losing two nodes will cause at most 1 task to be unavailable. However, this optimization seems fairly niche, and also the current implementation does not seem to implement it in a direct form, but a more relaxed constraint which usually, does not always avoid small loops. So it remains unclear whether this is really the intention behind the optimization. The current unit tests of the StickyTaskAssignor pass even after removing the optimization. The pairs optimization has a worst-case quadratic space and time complexity in the number of tasks, and make a lot of other optimizations impossible, so I’d suggest we remove it. I don’t think, in its current form, it is suitable to be implemented in a broker-side assignor. Note, however, if we identify a useful effect of the code in the future, we can work on finding an efficient algorithm that can bring the optimization to our broker-side assignor. This reduces the runtime of our worst case benchmark by 10x. Reviewers: Lucas Brutschy --- .../streams/assignor/StickyTaskAssignor.java | 101 ++---------------- .../assignor/StickyTaskAssignorTest.java | 89 --------------- 2 files changed, 7 insertions(+), 183 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java index b1f1d9b1a11..f455bb577eb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -91,8 +90,6 @@ public class StickyTaskAssignor implements TaskAssignor { localState.totalCapacity = groupSpec.members().size(); localState.tasksPerMember = computeTasksPerMember(localState.allTasks, localState.totalCapacity); - localState.taskPairs = new TaskPairs(localState.allTasks * (localState.allTasks - 1) / 2); - localState.processIdToState = new HashMap<>(); localState.activeTaskToPrevMember = new HashMap<>(); localState.standbyTaskToPrevMember = new HashMap<>(); @@ -175,7 +172,7 @@ public class StickyTaskAssignor implements TaskAssignor { final Member prevMember = localState.activeTaskToPrevMember.get(task); if (prevMember != null && hasUnfulfilledQuota(prevMember)) { localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); - updateHelpers(prevMember, task, true); + updateHelpers(prevMember, true); it.remove(); } } @@ -187,7 +184,7 @@ public class StickyTaskAssignor implements TaskAssignor { final Member prevMember = findMemberWithLeastLoad(prevMembers, task, true); if (prevMember != null && hasUnfulfilledQuota(prevMember)) { localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); - updateHelpers(prevMember, task, true); + updateHelpers(prevMember, true); it.remove(); } } @@ -204,7 +201,7 @@ public class StickyTaskAssignor implements TaskAssignor { } localState.processIdToState.get(member.processId).addTask(member.memberId, task, true); it.remove(); - updateHelpers(member, task, true); + updateHelpers(member, true); } } @@ -221,20 +218,10 @@ public class StickyTaskAssignor implements TaskAssignor { if (members == null || members.isEmpty()) { return null; } - Set rightPairs = members.stream() - .filter(member -> localState.taskPairs.hasNewPair(taskId, localState.processIdToState.get(member.processId).assignedTasks())) - .collect(Collectors.toSet()); - if (rightPairs.isEmpty()) { - rightPairs = members; - } - Optional processWithLeastLoad = rightPairs.stream() + Optional processWithLeastLoad = members.stream() .map(member -> localState.processIdToState.get(member.processId)) .min(Comparator.comparingDouble(ProcessState::load)); - // processWithLeastLoad must be present at this point, but we do a double check - if (processWithLeastLoad.isEmpty()) { - return null; - } // if the same exact former member is needed if (returnSameMember) { return localState.standbyTaskToPrevMember.get(taskId).stream() @@ -275,8 +262,7 @@ public class StickyTaskAssignor implements TaskAssignor { // prev active task Member prevMember = localState.activeTaskToPrevMember.get(task); - if (prevMember != null && availableProcesses.contains(prevMember.processId) && isLoadBalanced(prevMember.processId) - && localState.taskPairs.hasNewPair(task, localState.processIdToState.get(prevMember.processId).assignedTasks())) { + if (prevMember != null && availableProcesses.contains(prevMember.processId) && isLoadBalanced(prevMember.processId)) { standby = prevMember; } @@ -304,7 +290,7 @@ public class StickyTaskAssignor implements TaskAssignor { } } localState.processIdToState.get(standby.processId).addTask(standby.memberId, task, false); - updateHelpers(standby, task, false); + updateHelpers(standby, false); } } @@ -323,10 +309,7 @@ public class StickyTaskAssignor implements TaskAssignor { return process.hasCapacity() || isLeastLoadedProcess; } - private void updateHelpers(final Member member, final TaskId taskId, final boolean isActive) { - // add all pair combinations: update taskPairs - localState.taskPairs.addPairs(taskId, localState.processIdToState.get(member.processId).assignedTasks()); - + private void updateHelpers(final Member member, final boolean isActive) { if (isActive) { // update task per process maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).activeTaskCount()); @@ -344,75 +327,6 @@ public class StickyTaskAssignor implements TaskAssignor { return tasksPerMember; } - private static class TaskPairs { - private final Set pairs; - private final int maxPairs; - - TaskPairs(final int maxPairs) { - this.maxPairs = maxPairs; - this.pairs = new HashSet<>(maxPairs); - } - - boolean hasNewPair(final TaskId task1, - final Set taskIds) { - if (pairs.size() == maxPairs) { - return false; - } - if (taskIds.size() == 0) { - return true; - } - for (final TaskId taskId : taskIds) { - if (!pairs.contains(pair(task1, taskId))) { - return true; - } - } - return false; - } - - void addPairs(final TaskId taskId, final Set assigned) { - for (final TaskId id : assigned) { - if (!id.equals(taskId)) - pairs.add(pair(id, taskId)); - } - } - - Pair pair(final TaskId task1, final TaskId task2) { - if (task1.compareTo(task2) < 0) { - return new Pair(task1, task2); - } - return new Pair(task2, task1); - } - - - private static class Pair { - private final TaskId task1; - private final TaskId task2; - - Pair(final TaskId task1, final TaskId task2) { - this.task1 = task1; - this.task2 = task2; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final Pair pair = (Pair) o; - return Objects.equals(task1, pair.task1) && - Objects.equals(task2, pair.task2); - } - - @Override - public int hashCode() { - return Objects.hash(task1, task2); - } - } - } - static class Member { private final String processId; private final String memberId; @@ -425,7 +339,6 @@ public class StickyTaskAssignor implements TaskAssignor { private static class LocalState { // helper data structures: - private TaskPairs taskPairs; Map activeTaskToPrevMember; Map> standbyTaskToPrevMember; Map processIdToState; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java index c0493887266..ee6cc5e2ae9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java @@ -32,12 +32,10 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.Arrays.asList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -581,86 +579,6 @@ public class StickyTaskAssignorTest { assertEquals(4, getAllActiveTaskCount(result, "member1")); } - @Test - public void shouldNotHaveSameAssignmentOnAnyTwoHosts() { - final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); - final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); - final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); - final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); - final List allMemberIds = asList("member1", "member2", "member3", "member4"); - Map members = mkMap( - mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); - - final GroupAssignment result = assignor.assign( - new GroupSpecImpl(members, - mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))), - new TopologyDescriberImpl(4, true, List.of("test-subtopology")) - ); - - for (final String memberId : allMemberIds) { - final List taskIds = getAllTaskIds(result, memberId); - for (final String otherMemberId : allMemberIds) { - if (!memberId.equals(otherMemberId)) { - assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId)); - } - } - } - } - - @Test - public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousActiveTasks() { - final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2))), Map.of()); - final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", mkMap(mkEntry("test-subtopology", Sets.newSet(3))), Map.of()); - final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Sets.newSet(0))), Map.of()); - final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); - final List allMemberIds = asList("member1", "member2", "member3", "member4"); - Map members = mkMap( - mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); - - final GroupAssignment result = assignor.assign( - new GroupSpecImpl(members, - mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))), - new TopologyDescriberImpl(4, true, List.of("test-subtopology")) - ); - - for (final String memberId : allMemberIds) { - final List taskIds = getAllTaskIds(result, memberId); - for (final String otherMemberId : allMemberIds) { - if (!memberId.equals(otherMemberId)) { - assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId)); - } - } - } - } - - @Test - public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() { - final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", - mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2))), mkMap(mkEntry("test-subtopology", Sets.newSet(3, 0)))); - final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", - mkMap(mkEntry("test-subtopology", Sets.newSet(3, 0))), mkMap(mkEntry("test-subtopology", Sets.newSet(1, 2)))); - final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); - final AssignmentMemberSpec memberSpec4 = createAssignmentMemberSpec("process4"); - final List allMemberIds = asList("member1", "member2", "member3", "member4"); - Map members = mkMap( - mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3), mkEntry("member4", memberSpec4)); - - final GroupAssignment result = assignor.assign( - new GroupSpecImpl(members, - mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))), - new TopologyDescriberImpl(4, true, List.of("test-subtopology")) - ); - - for (final String memberId : allMemberIds) { - final List taskIds = getAllTaskIds(result, memberId); - for (final String otherMemberId : allMemberIds) { - if (!memberId.equals(otherMemberId)) { - assertNotEquals(taskIds, getAllTaskIds(result, otherMemberId)); - } - } - } - } - @Test public void shouldReBalanceTasksAcrossAllClientsWhenCapacityAndTaskCountTheSame() { final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3", mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1, 2, 3))), Map.of()); @@ -1020,13 +938,6 @@ public class StickyTaskAssignorTest { return res; } - private List getAllTaskIds(GroupAssignment result, String... memberIds) { - List res = new ArrayList<>(); - res.addAll(getAllActiveTaskIds(result, memberIds)); - res.addAll(getAllStandbyTaskIds(result, memberIds)); - return res; - } - private Map> mergeAllStandbyTasks(GroupAssignment result, String... memberIds) { Map> res = new HashMap<>(); for (String memberId : memberIds) {