From 6247fd9eb36dfd5f61a90631c4f5d812c81e8019 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Wed, 3 Sep 2025 17:13:01 +0200 Subject: [PATCH] KAFKA-19478 [3/N]: Use heaps to discover the least loaded process (#20172) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The original implementation uses a linear search to find the least loaded process in O(n), and we can replace this by look-ups in a heap is O(log(n)), as described below Active tasks: For active tasks, we can do exactly the same assignment as in the original algorithm by first building a heap (by load) of all processes. When we assign a task, we pick the head off the heap, assign the task to it, update the load, and re-insert it into the heap in O(log(n)). Standby tasks: For standby tasks, we cannot do this optimization directly, because of the order in which we assign tasks: 1. We first try to assign task A to a process that previously owned A. 2. If we did not find such a process, we assign A to the least loaded node. 3. We now try to assign task B to a process that previously owned B 4. If we did not find such a process, we assign B to the least loaded node ... The problem is that we cannot efficiently keep a heap (by load) throughout this process, because finding and removing process that previously owned A (and B and…) in the heap is O(n). We therefore need to change the order of evaluation to be able to use a heap: 1. Try to assign all tasks A, B.. to a process that previously owned the task 2. Build a heap. 3. Assign all remaining tasks to the least-loaded process that does not yet own the task. Since at most NumStandbyReplicas already own the task, we can do it by removing up to NumStandbyReplicas from the top of the heap in O(log(n)), so we get O(log(NumProcesses)*NumStandbyReplicas). Note that the change in order changes the resulting standby assignments (although this difference does not show up in the existing unit tests). I would argue that the new order of assignment will actually yield better assignments, since the assignment will be more sticky, which has the potential to reduce the amount of store we have to restore from the changelog topic after assingments. In our worst-performing benchmark, this improves the runtime by ~107x. Reviewers: Bill Bejeck --- .../streams/assignor/StickyTaskAssignor.java | 257 ++++++++++------- .../assignor/StickyTaskAssignorTest.java | 259 ++++++++++++++++++ .../integration/RestoreIntegrationTest.java | 2 +- .../SmokeTestDriverIntegrationTest.java | 12 +- .../StandbyTaskCreationIntegrationTest.java | 2 +- .../utils/EmbeddedKafkaCluster.java | 30 +- 6 files changed, 451 insertions(+), 111 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java index 903631d0460..7ef5a382584 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java @@ -20,12 +20,14 @@ package org.apache.kafka.coordinator.group.streams.assignor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Optional; +import java.util.PriorityQueue; import java.util.Set; import java.util.stream.Collectors; @@ -45,14 +47,14 @@ public class StickyTaskAssignor implements TaskAssignor { @Override public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) throws TaskAssignorException { initialize(groupSpec, topologyDescriber); - GroupAssignment assignments = doAssign(groupSpec, topologyDescriber); + final GroupAssignment assignments = doAssign(groupSpec, topologyDescriber); localState = null; return assignments; } private GroupAssignment doAssign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) { //active - Set activeTasks = taskIds(topologyDescriber, true); + final Set activeTasks = taskIds(topologyDescriber, true); assignActive(activeTasks); //standby @@ -60,7 +62,7 @@ public class StickyTaskAssignor implements TaskAssignor { groupSpec.assignmentConfigs().isEmpty() ? 0 : Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas")); if (numStandbyReplicas > 0) { - Set statefulTasks = taskIds(topologyDescriber, false); + final Set statefulTasks = taskIds(topologyDescriber, false); assignStandby(statefulTasks, numStandbyReplicas); } @@ -68,10 +70,10 @@ public class StickyTaskAssignor implements TaskAssignor { } private Set taskIds(final TopologyDescriber topologyDescriber, final boolean isActive) { - Set ret = new HashSet<>(); - for (String subtopology : topologyDescriber.subtopologies()) { + final Set ret = new HashSet<>(); + for (final String subtopology : topologyDescriber.subtopologies()) { if (isActive || topologyDescriber.isStateful(subtopology)) { - int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); + final int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); for (int i = 0; i < numberOfPartitions; i++) { ret.add(new TaskId(subtopology, i)); } @@ -83,8 +85,8 @@ public class StickyTaskAssignor implements TaskAssignor { private void initialize(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) { localState = new LocalState(); localState.allTasks = 0; - for (String subtopology : topologyDescriber.subtopologies()) { - int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); + for (final String subtopology : topologyDescriber.subtopologies()) { + final int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); localState.allTasks += numberOfPartitions; } localState.totalCapacity = groupSpec.members().size(); @@ -93,7 +95,7 @@ public class StickyTaskAssignor implements TaskAssignor { localState.processIdToState = new HashMap<>(); localState.activeTaskToPrevMember = new HashMap<>(); localState.standbyTaskToPrevMember = new HashMap<>(); - for (Map.Entry memberEntry : groupSpec.members().entrySet()) { + for (final Map.Entry memberEntry : groupSpec.members().entrySet()) { final String memberId = memberEntry.getKey(); final String processId = memberEntry.getValue().processId(); final Member member = new Member(processId, memberId); @@ -103,19 +105,19 @@ public class StickyTaskAssignor implements TaskAssignor { localState.processIdToState.get(processId).addMember(memberId); // prev active tasks - for (Map.Entry> entry : memberSpec.activeTasks().entrySet()) { - Set partitionNoSet = entry.getValue(); - for (int partitionNo : partitionNoSet) { + for (final Map.Entry> entry : memberSpec.activeTasks().entrySet()) { + final Set partitionNoSet = entry.getValue(); + for (final int partitionNo : partitionNoSet) { localState.activeTaskToPrevMember.put(new TaskId(entry.getKey(), partitionNo), member); } } // prev standby tasks - for (Map.Entry> entry : memberSpec.standbyTasks().entrySet()) { - Set partitionNoSet = entry.getValue(); - for (int partitionNo : partitionNoSet) { - TaskId taskId = new TaskId(entry.getKey(), partitionNo); - localState.standbyTaskToPrevMember.putIfAbsent(taskId, new HashSet<>()); + for (final Map.Entry> entry : memberSpec.standbyTasks().entrySet()) { + final Set partitionNoSet = entry.getValue(); + for (final int partitionNo : partitionNoSet) { + final TaskId taskId = new TaskId(entry.getKey(), partitionNo); + localState.standbyTaskToPrevMember.putIfAbsent(taskId, new ArrayList<>()); localState.standbyTaskToPrevMember.get(taskId).add(member); } } @@ -139,15 +141,15 @@ public class StickyTaskAssignor implements TaskAssignor { return set1; })); - for (String memberId : members) { - Map> activeTasks = new HashMap<>(); + for (final String memberId : members) { + final Map> activeTasks = new HashMap<>(); if (activeTasksAssignments.containsKey(memberId)) { - activeTasks = toCompactedTaskIds(activeTasksAssignments.get(memberId)); + activeTasks.putAll(toCompactedTaskIds(activeTasksAssignments.get(memberId))); } - Map> standByTasks = new HashMap<>(); + final Map> standByTasks = new HashMap<>(); if (standbyTasksAssignments.containsKey(memberId)) { - standByTasks = toCompactedTaskIds(standbyTasksAssignments.get(memberId)); + standByTasks.putAll(toCompactedTaskIds(standbyTasksAssignments.get(memberId))); } memberAssignments.put(memberId, new MemberAssignment(activeTasks, standByTasks, new HashMap<>())); } @@ -156,8 +158,8 @@ public class StickyTaskAssignor implements TaskAssignor { } private Map> toCompactedTaskIds(final Set taskIds) { - Map> ret = new HashMap<>(); - for (TaskId taskId : taskIds) { + final Map> ret = new HashMap<>(); + for (final TaskId taskId : taskIds) { ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>()); ret.get(taskId.subtopologyId()).add(taskId.partition()); } @@ -167,42 +169,47 @@ public class StickyTaskAssignor implements TaskAssignor { private void assignActive(final Set activeTasks) { // 1. re-assigning existing active tasks to clients that previously had the same active tasks - for (Iterator it = activeTasks.iterator(); it.hasNext();) { + for (final Iterator it = activeTasks.iterator(); it.hasNext();) { final TaskId task = it.next(); final Member prevMember = localState.activeTaskToPrevMember.get(task); if (prevMember != null && hasUnfulfilledQuota(prevMember)) { - localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); - updateHelpers(prevMember, true); + final ProcessState processState = localState.processIdToState.get(prevMember.processId); + processState.addTask(prevMember.memberId, task, true); + maybeUpdateTasksPerMember(processState.activeTaskCount()); it.remove(); } } // 2. re-assigning tasks to clients that previously have seen the same task (as standby task) - for (Iterator it = activeTasks.iterator(); it.hasNext();) { + for (final Iterator it = activeTasks.iterator(); it.hasNext();) { final TaskId task = it.next(); - final Set prevMembers = localState.standbyTaskToPrevMember.get(task); - final Member prevMember = findMemberWithLeastLoad(prevMembers, task, true); + final ArrayList prevMembers = localState.standbyTaskToPrevMember.get(task); + final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, null); if (prevMember != null && hasUnfulfilledQuota(prevMember)) { - localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); - updateHelpers(prevMember, true); + final ProcessState processState = localState.processIdToState.get(prevMember.processId); + processState.addTask(prevMember.memberId, task, true); + maybeUpdateTasksPerMember(processState.activeTaskCount()); it.remove(); } } // 3. assign any remaining unassigned tasks - for (Iterator it = activeTasks.iterator(); it.hasNext();) { + final PriorityQueue processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load)); + processByLoad.addAll(localState.processIdToState.values()); + for (final Iterator it = activeTasks.iterator(); it.hasNext();) { final TaskId task = it.next(); - final Set allMembers = localState.processIdToState.entrySet().stream().flatMap(entry -> entry.getValue().memberToTaskCounts().keySet().stream() - .map(memberId -> new Member(entry.getKey(), memberId))).collect(Collectors.toSet()); - final Member member = findMemberWithLeastLoad(allMembers, task, false); + final ProcessState processWithLeastLoad = processByLoad.poll(); + if (processWithLeastLoad == null) { + throw new TaskAssignorException("No process available to assign active task {}." + task); + } + final String member = memberWithLeastLoad(processWithLeastLoad); if (member == null) { - log.error("Unable to assign active task {} to any member.", task); throw new TaskAssignorException("No member available to assign active task {}." + task); } - localState.processIdToState.get(member.processId).addTask(member.memberId, task, true); + processWithLeastLoad.addTask(member, task, true); it.remove(); - updateHelpers(member, true); - + maybeUpdateTasksPerMember(processWithLeastLoad.activeTaskCount()); + processByLoad.add(processWithLeastLoad); // Add it back to the queue after updating its state } } @@ -214,29 +221,75 @@ public class StickyTaskAssignor implements TaskAssignor { } } - private Member findMemberWithLeastLoad(final Set members, TaskId taskId, final boolean returnSameMember) { + private boolean assignStandbyToMemberWithLeastLoad(PriorityQueue queue, TaskId taskId) { + final ProcessState processWithLeastLoad = queue.poll(); + if (processWithLeastLoad == null) { + return false; + } + boolean found = false; + if (!processWithLeastLoad.hasTask(taskId)) { + final String memberId = memberWithLeastLoad(processWithLeastLoad); + if (memberId != null) { + processWithLeastLoad.addTask(memberId, taskId, false); + found = true; + } + } else if (!queue.isEmpty()) { + found = assignStandbyToMemberWithLeastLoad(queue, taskId); + } + queue.add(processWithLeastLoad); // Add it back to the queue after updating its state + return found; + } + + /** + * Finds the previous member with the least load for a given task. + * + * @param members The list of previous members owning the task. + * @param taskId The taskId, to check if the previous member already has the task. Can be null, if we assign it + * for the first time (e.g., during active task assignment). + * + * @return Previous member with the least load that deoes not have the task, or null if no such member exists. + */ + private Member findPrevMemberWithLeastLoad(final ArrayList members, final TaskId taskId) { if (members == null || members.isEmpty()) { return null; } - Optional processWithLeastLoad = members.stream() - .map(member -> localState.processIdToState.get(member.processId)) - .min(Comparator.comparingDouble(ProcessState::load)); - // if the same exact former member is needed - if (returnSameMember) { - return localState.standbyTaskToPrevMember.get(taskId).stream() - .filter(standby -> standby.processId.equals(processWithLeastLoad.get().processId())) - .findFirst() - .orElseGet(() -> memberWithLeastLoad(processWithLeastLoad.get())); + Member candidate = members.get(0); + final ProcessState candidateProcessState = localState.processIdToState.get(candidate.processId); + double candidateProcessLoad = candidateProcessState.load(); + double candidateMemberLoad = candidateProcessState.memberToTaskCounts().get(candidate.memberId); + for (int i = 1; i < members.size(); i++) { + final Member member = members.get(i); + final ProcessState processState = localState.processIdToState.get(member.processId); + final double newProcessLoad = processState.load(); + if (newProcessLoad < candidateProcessLoad && (taskId == null || !processState.hasTask(taskId))) { + final double newMemberLoad = processState.memberToTaskCounts().get(member.memberId); + if (newMemberLoad < candidateMemberLoad) { + candidateProcessLoad = newProcessLoad; + candidateMemberLoad = newMemberLoad; + candidate = member; + } + } } - return memberWithLeastLoad(processWithLeastLoad.get()); + + if (taskId == null || !candidateProcessState.hasTask(taskId)) { + return candidate; + } + return null; } - private Member memberWithLeastLoad(final ProcessState processWithLeastLoad) { - Optional memberWithLeastLoad = processWithLeastLoad.memberToTaskCounts().entrySet().stream() + private String memberWithLeastLoad(final ProcessState processWithLeastLoad) { + final Map members = processWithLeastLoad.memberToTaskCounts(); + if (members.isEmpty()) { + return null; + } + if (members.size() == 1) { + return members.keySet().iterator().next(); + } + final Optional memberWithLeastLoad = processWithLeastLoad.memberToTaskCounts().entrySet().stream() .min(Map.Entry.comparingByValue()) .map(Map.Entry::getKey); - return memberWithLeastLoad.map(memberId -> new Member(processWithLeastLoad.processId(), memberId)).orElse(null); + return memberWithLeastLoad.orElse(null); } private boolean hasUnfulfilledQuota(final Member member) { @@ -244,55 +297,49 @@ public class StickyTaskAssignor implements TaskAssignor { } private void assignStandby(final Set standbyTasks, final int numStandbyReplicas) { - for (TaskId task : standbyTasks) { + final ArrayList toLeastLoaded = new ArrayList<>(standbyTasks.size() * numStandbyReplicas); + for (final TaskId task : standbyTasks) { for (int i = 0; i < numStandbyReplicas; i++) { - final Set availableProcesses = localState.processIdToState.values().stream() - .filter(process -> !process.hasTask(task)) - .map(ProcessState::processId) - .collect(Collectors.toSet()); - - if (availableProcesses.isEmpty()) { - log.warn("{} There is not enough available capacity. " + - "You should increase the number of threads and/or application instances to maintain the requested number of standby replicas.", - errorMessage(numStandbyReplicas, i, task)); - break; - } - Member standby = null; - // prev active task - Member prevMember = localState.activeTaskToPrevMember.get(task); - if (prevMember != null && availableProcesses.contains(prevMember.processId) && isLoadBalanced(prevMember.processId)) { - standby = prevMember; + final Member prevMember = localState.activeTaskToPrevMember.get(task); + if (prevMember != null) { + final ProcessState prevMemberProcessState = localState.processIdToState.get(prevMember.processId); + if (!prevMemberProcessState.hasTask(task) && isLoadBalanced(prevMemberProcessState)) { + prevMemberProcessState.addTask(prevMember.memberId, task, false); + continue; + } } // prev standby tasks - if (standby == null) { - final Set prevMembers = localState.standbyTaskToPrevMember.get(task); - if (prevMembers != null && !prevMembers.isEmpty()) { - prevMembers.removeIf(member -> !availableProcesses.contains(member.processId)); - prevMember = findMemberWithLeastLoad(prevMembers, task, true); - if (prevMember != null && isLoadBalanced(prevMember.processId)) { - standby = prevMember; + final ArrayList prevMembers = localState.standbyTaskToPrevMember.get(task); + if (prevMembers != null && !prevMembers.isEmpty()) { + final Member prevMember2 = findPrevMemberWithLeastLoad(prevMembers, task); + if (prevMember2 != null) { + final ProcessState prevMemberProcessState = localState.processIdToState.get(prevMember2.processId); + if (isLoadBalanced(prevMemberProcessState)) { + prevMemberProcessState.addTask(prevMember2.memberId, task, false); + continue; } } } - // others - if (standby == null) { - final Set availableMembers = availableProcesses.stream() - .flatMap(pId -> localState.processIdToState.get(pId).memberToTaskCounts().keySet().stream() - .map(mId -> new Member(pId, mId))).collect(Collectors.toSet()); - standby = findMemberWithLeastLoad(availableMembers, task, false); - if (standby == null) { - log.warn("{} Error in standby task assignment!", errorMessage(numStandbyReplicas, i, task)); - break; - } - } - localState.processIdToState.get(standby.processId).addTask(standby.memberId, task, false); - updateHelpers(standby, false); + toLeastLoaded.add(new StandbyToAssign(task, numStandbyReplicas - i)); + break; } + } + final PriorityQueue processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load)); + processByLoad.addAll(localState.processIdToState.values()); + for (final StandbyToAssign toAssign : toLeastLoaded) { + for (int i = 0; i < toAssign.remainingReplicas; i++) { + if (!assignStandbyToMemberWithLeastLoad(processByLoad, toAssign.taskId)) { + log.warn("{} There is not enough available capacity. " + + "You should increase the number of threads and/or application instances to maintain the requested number of standby replicas.", + errorMessage(numStandbyReplicas, i, toAssign.taskId)); + break; + } + } } } @@ -301,21 +348,13 @@ public class StickyTaskAssignor implements TaskAssignor { " of " + numStandbyReplicas + " standby tasks for task [" + task + "]."; } - private boolean isLoadBalanced(final String processId) { - final ProcessState process = localState.processIdToState.get(processId); + private boolean isLoadBalanced(final ProcessState process) { final double load = process.load(); - boolean isLeastLoadedProcess = localState.processIdToState.values().stream() + final boolean isLeastLoadedProcess = localState.processIdToState.values().stream() .allMatch(p -> p.load() >= load); return process.hasCapacity() || isLeastLoadedProcess; } - private void updateHelpers(final Member member, final boolean isActive) { - if (isActive) { - // update task per process - maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).activeTaskCount()); - } - } - private static int computeTasksPerMember(final int numberOfTasks, final int numberOfMembers) { if (numberOfMembers == 0) { return 0; @@ -326,7 +365,17 @@ public class StickyTaskAssignor implements TaskAssignor { } return tasksPerMember; } - + + static class StandbyToAssign { + private final TaskId taskId; + private final int remainingReplicas; + + public StandbyToAssign(final TaskId taskId, final int remainingReplicas) { + this.taskId = taskId; + this.remainingReplicas = remainingReplicas; + } + } + static class Member { private final String processId; private final String memberId; @@ -340,11 +389,11 @@ public class StickyTaskAssignor implements TaskAssignor { private static class LocalState { // helper data structures: Map activeTaskToPrevMember; - Map> standbyTaskToPrevMember; + Map> standbyTaskToPrevMember; Map processIdToState; int allTasks; int totalCapacity; int tasksPerMember; } -} \ No newline at end of file +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java index 5d26f9a9e12..b4fa9c4db99 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java @@ -833,6 +833,265 @@ public class StickyTaskAssignorTest { assertEquals(2, getAllActiveTaskIds(result, "newMember").size()); } + @Test + public void shouldHandleLargeNumberOfTasksWithStandbyAssignment() { + final int numTasks = 100; + final int numClients = 5; + final int numStandbyReplicas = 2; + + Map members = new HashMap<>(); + for (int i = 0; i < numClients; i++) { + members.put("member" + i, createAssignmentMemberSpec("process" + i)); + } + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology")) + ); + + // Verify all active tasks are assigned + Set allActiveTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberActiveTasks = getAllActiveTaskIds(result, memberId); + allActiveTasks.addAll(memberActiveTasks); + } + assertEquals(numTasks, allActiveTasks.size()); + + // Verify standby tasks are assigned (should be numTasks * numStandbyReplicas total) + Set allStandbyTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberStandbyTasks = getAllStandbyTaskIds(result, memberId); + allStandbyTasks.addAll(memberStandbyTasks); + } + // With 5 clients and 2 standby replicas, we should have at least some standby tasks + assertTrue(allStandbyTasks.size() > 0, "Should have some standby tasks assigned"); + // Maximum possible = numTasks * min(numStandbyReplicas, numClients - 1) = 100 * 2 = 200 + int maxPossibleStandbyTasks = numTasks * Math.min(numStandbyReplicas, numClients - 1); + assertTrue(allStandbyTasks.size() <= maxPossibleStandbyTasks, + "Should not exceed maximum possible standby tasks: " + maxPossibleStandbyTasks); + + // Verify no client has both active and standby for the same task + for (String memberId : result.members().keySet()) { + Set memberActiveTasks = new HashSet<>(getAllActiveTaskIds(result, memberId)); + Set memberStandbyTasks = new HashSet<>(getAllStandbyTaskIds(result, memberId)); + memberActiveTasks.retainAll(memberStandbyTasks); + assertTrue(memberActiveTasks.isEmpty(), "Client " + memberId + " has both active and standby for same task"); + } + + // Verify load distribution is reasonable + int minActiveTasks = Integer.MAX_VALUE; + int maxActiveTasks = 0; + for (String memberId : result.members().keySet()) { + int activeTaskCount = getAllActiveTaskCount(result, memberId); + minActiveTasks = Math.min(minActiveTasks, activeTaskCount); + maxActiveTasks = Math.max(maxActiveTasks, activeTaskCount); + } + // With 100 tasks and 5 clients, each should have 20 tasks + assertEquals(20, minActiveTasks); + assertEquals(20, maxActiveTasks); + + // Verify standby task distribution is reasonable + int minStandbyTasks = Integer.MAX_VALUE; + int maxStandbyTasks = 0; + for (String memberId : result.members().keySet()) { + int standbyTaskCount = getAllStandbyTaskIds(result, memberId).size(); + minStandbyTasks = Math.min(minStandbyTasks, standbyTaskCount); + maxStandbyTasks = Math.max(maxStandbyTasks, standbyTaskCount); + } + // Each client should have some standby tasks, but not necessarily equal distribution + assertTrue(minStandbyTasks >= 0); + assertTrue(maxStandbyTasks > 0); + } + + @Test + public void shouldHandleOddNumberOfClientsWithStandbyTasks() { + // Test with odd number of clients (7) and even number of tasks (14) + final int numTasks = 14; + final int numClients = 7; + final int numStandbyReplicas = 1; + + Map members = new HashMap<>(); + for (int i = 0; i < numClients; i++) { + members.put("member" + i, createAssignmentMemberSpec("process" + i)); + } + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology")) + ); + + // Verify all active tasks are assigned + Set allActiveTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberActiveTasks = getAllActiveTaskIds(result, memberId); + allActiveTasks.addAll(memberActiveTasks); + } + assertEquals(numTasks, allActiveTasks.size()); + + // Verify standby tasks are assigned + Set allStandbyTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberStandbyTasks = getAllStandbyTaskIds(result, memberId); + allStandbyTasks.addAll(memberStandbyTasks); + } + assertEquals(numTasks * numStandbyReplicas, allStandbyTasks.size()); + + // With 14 tasks and 7 clients, each client should have 2 active tasks + int expectedTasksPerClient = numTasks / numClients; // 14 / 7 = 2 + int remainder = numTasks % numClients; // 14 % 7 = 0 + + int clientsWithExpectedTasks = 0; + int clientsWithOneMoreTask = 0; + for (String memberId : result.members().keySet()) { + int activeTaskCount = getAllActiveTaskCount(result, memberId); + if (activeTaskCount == expectedTasksPerClient) { + clientsWithExpectedTasks++; + } else if (activeTaskCount == expectedTasksPerClient + 1) { + clientsWithOneMoreTask++; + } + } + assertEquals(numClients - remainder, clientsWithExpectedTasks); // 7 clients should have 2 tasks + assertEquals(remainder, clientsWithOneMoreTask); // 0 clients should have 3 tasks + } + + @Test + public void shouldHandleHighStandbyReplicaCount() { + // Test with high number of standby replicas (5) and limited clients (3) + final int numTasks = 6; + final int numClients = 3; + final int numStandbyReplicas = 5; + + Map members = new HashMap<>(); + for (int i = 0; i < numClients; i++) { + members.put("member" + i, createAssignmentMemberSpec("process" + i)); + } + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology")) + ); + + // Verify all active tasks are assigned + Set allActiveTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberActiveTasks = getAllActiveTaskIds(result, memberId); + allActiveTasks.addAll(memberActiveTasks); + } + assertEquals(numTasks, allActiveTasks.size()); + + // With only 3 clients and 5 standby replicas, not all standby replicas can be assigned + // since each client can only hold standby tasks for tasks it doesn't have active + Set allStandbyTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberStandbyTasks = getAllStandbyTaskIds(result, memberId); + allStandbyTasks.addAll(memberStandbyTasks); + } + + // Maximum possible = numTasks * min(numStandbyReplicas, numClients - 1) = 6 * 2 = 12 + int maxPossibleStandbyTasks = numTasks * Math.min(numStandbyReplicas, numClients - 1); + assertTrue(allStandbyTasks.size() <= maxPossibleStandbyTasks); + assertTrue(allStandbyTasks.size() > 0); // Should assign at least some standby tasks + } + + @Test + public void shouldHandleLargeNumberOfSubtopologiesWithStandbyTasks() { + // Test with many subtopologies (10) each with different numbers of tasks + final int numSubtopologies = 10; + final int numClients = 4; + final int numStandbyReplicas = 1; + + List subtopologies = new ArrayList<>(); + for (int i = 0; i < numSubtopologies; i++) { + subtopologies.add("subtopology-" + i); + } + + Map members = new HashMap<>(); + for (int i = 0; i < numClients; i++) { + members.put("member" + i, createAssignmentMemberSpec("process" + i)); + } + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(5, true, subtopologies) // 5 tasks per subtopology + ); + + // Verify all subtopologies have tasks assigned + Set subtopologiesWithTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + MemberAssignment member = result.members().get(memberId); + subtopologiesWithTasks.addAll(member.activeTasks().keySet()); + } + assertEquals(numSubtopologies, subtopologiesWithTasks.size()); + + // Verify standby tasks are assigned across subtopologies + Set subtopologiesWithStandbyTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + MemberAssignment member = result.members().get(memberId); + subtopologiesWithStandbyTasks.addAll(member.standbyTasks().keySet()); + } + assertEquals(numSubtopologies, subtopologiesWithStandbyTasks.size()); + } + + @Test + public void shouldHandleEdgeCaseWithSingleClientAndMultipleStandbyReplicas() { + // Test edge case: single client with multiple standby replicas + final int numTasks = 10; + final int numStandbyReplicas = 3; + + Map members = mkMap( + mkEntry("member1", createAssignmentMemberSpec("process1")) + ); + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology")) + ); + + // Single client should get all active tasks + assertEquals(numTasks, getAllActiveTaskCount(result, "member1")); + + // No standby tasks should be assigned since there's only one client + // (standby tasks can't be assigned to the same client as active tasks) + assertTrue(getAllStandbyTaskIds(result, "member1").isEmpty()); + } + + @Test + public void shouldHandleEdgeCaseWithMoreStandbyReplicasThanAvailableClients() { + // Test edge case: more standby replicas than available clients + final int numTasks = 4; + final int numClients = 2; + final int numStandbyReplicas = 5; // More than available clients + + Map members = new HashMap<>(); + for (int i = 0; i < numClients; i++) { + members.put("member" + i, createAssignmentMemberSpec("process" + i)); + } + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology")) + ); + + // Verify all active tasks are assigned + Set allActiveTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberActiveTasks = getAllActiveTaskIds(result, memberId); + allActiveTasks.addAll(memberActiveTasks); + } + assertEquals(numTasks, allActiveTasks.size()); + + // With only 2 clients, maximum standby tasks per task = 1 (since each client has active tasks) + Set allStandbyTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberStandbyTasks = getAllStandbyTaskIds(result, memberId); + allStandbyTasks.addAll(memberStandbyTasks); + } + + // Maximum possible = numTasks * 1 = 4 + assertEquals(numTasks, allStandbyTasks.size()); + } + + private int getAllActiveTaskCount(GroupAssignment result, String... memberIds) { int size = 0; for (String memberId : memberIds) { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index fbd9a3a14b1..7370d488757 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -573,7 +573,7 @@ public class RestoreIntegrationTest { createStateForRestoration(inputStream, 0); if (useNewProtocol) { - CLUSTER.setStandbyReplicas(appId, 1); + CLUSTER.setGroupStandbyReplicas(appId, 1); } final Properties props1 = props(stateUpdaterEnabled); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index c40b3433a91..d9df2077b2c 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -140,15 +140,19 @@ public class SmokeTestDriverIntegrationTest { final Properties props = new Properties(); + final String appId = safeUniqueTestName(testInfo); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueTestName(testInfo)); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); props.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); - // decrease the session timeout so that we can trigger the rebalance soon after old client left closed - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); - props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500); if (streamsProtocolEnabled) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + // decrease the session timeout so that we can trigger the rebalance soon after old client left closed + CLUSTER.setGroupSessionTimeout(appId, 10000); + CLUSTER.setGroupHeartbeatTimeout(appId, 1000); + } else { + // decrease the session timeout so that we can trigger the rebalance soon after old client left closed + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); } // cycle out Streams instances as long as the test is running. diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java index ec48b8b3634..8c8ef3dae9c 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -99,7 +99,7 @@ public class StandbyTaskCreationIntegrationTest { streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); if (streamsProtocolEnabled) { streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); - CLUSTER.setStandbyReplicas("app-" + safeTestName, 1); + CLUSTER.setGroupStandbyReplicas("app-" + safeTestName, 1); } else { streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index f425f8365ee..1de7a45bfce 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -401,6 +401,8 @@ public class EmbeddedKafkaCluster { private void addDefaultBrokerPropsIfAbsent(final Properties brokerConfig) { brokerConfig.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L); + brokerConfig.putIfAbsent(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "100"); + brokerConfig.putIfAbsent(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "100"); brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "0"); brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0"); brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5"); @@ -439,7 +441,33 @@ public class EmbeddedKafkaCluster { } } - public void setStandbyReplicas(final String groupId, final int numStandbyReplicas) { + public void setGroupSessionTimeout(final String groupId, final int sessionTimeoutMs) { + try (final Admin adminClient = createAdminClient()) { + adminClient.incrementalAlterConfigs( + Map.of( + new ConfigResource(ConfigResource.Type.GROUP, groupId), + List.of(new AlterConfigOp(new ConfigEntry(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, String.valueOf(sessionTimeoutMs)), AlterConfigOp.OpType.SET)) + ) + ).all().get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + public void setGroupHeartbeatTimeout(final String groupId, final int heartbeatTimeoutMs) { + try (final Admin adminClient = createAdminClient()) { + adminClient.incrementalAlterConfigs( + Map.of( + new ConfigResource(ConfigResource.Type.GROUP, groupId), + List.of(new AlterConfigOp(new ConfigEntry(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(heartbeatTimeoutMs)), AlterConfigOp.OpType.SET)) + ) + ).all().get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + public void setGroupStandbyReplicas(final String groupId, final int numStandbyReplicas) { try (final Admin adminClient = createAdminClient()) { adminClient.incrementalAlterConfigs( Map.of(