diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java index 903631d0460..7ef5a382584 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java @@ -20,12 +20,14 @@ package org.apache.kafka.coordinator.group.streams.assignor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Optional; +import java.util.PriorityQueue; import java.util.Set; import java.util.stream.Collectors; @@ -45,14 +47,14 @@ public class StickyTaskAssignor implements TaskAssignor { @Override public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) throws TaskAssignorException { initialize(groupSpec, topologyDescriber); - GroupAssignment assignments = doAssign(groupSpec, topologyDescriber); + final GroupAssignment assignments = doAssign(groupSpec, topologyDescriber); localState = null; return assignments; } private GroupAssignment doAssign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) { //active - Set activeTasks = taskIds(topologyDescriber, true); + final Set activeTasks = taskIds(topologyDescriber, true); assignActive(activeTasks); //standby @@ -60,7 +62,7 @@ public class StickyTaskAssignor implements TaskAssignor { groupSpec.assignmentConfigs().isEmpty() ? 0 : Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas")); if (numStandbyReplicas > 0) { - Set statefulTasks = taskIds(topologyDescriber, false); + final Set statefulTasks = taskIds(topologyDescriber, false); assignStandby(statefulTasks, numStandbyReplicas); } @@ -68,10 +70,10 @@ public class StickyTaskAssignor implements TaskAssignor { } private Set taskIds(final TopologyDescriber topologyDescriber, final boolean isActive) { - Set ret = new HashSet<>(); - for (String subtopology : topologyDescriber.subtopologies()) { + final Set ret = new HashSet<>(); + for (final String subtopology : topologyDescriber.subtopologies()) { if (isActive || topologyDescriber.isStateful(subtopology)) { - int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); + final int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); for (int i = 0; i < numberOfPartitions; i++) { ret.add(new TaskId(subtopology, i)); } @@ -83,8 +85,8 @@ public class StickyTaskAssignor implements TaskAssignor { private void initialize(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) { localState = new LocalState(); localState.allTasks = 0; - for (String subtopology : topologyDescriber.subtopologies()) { - int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); + for (final String subtopology : topologyDescriber.subtopologies()) { + final int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); localState.allTasks += numberOfPartitions; } localState.totalCapacity = groupSpec.members().size(); @@ -93,7 +95,7 @@ public class StickyTaskAssignor implements TaskAssignor { localState.processIdToState = new HashMap<>(); localState.activeTaskToPrevMember = new HashMap<>(); localState.standbyTaskToPrevMember = new HashMap<>(); - for (Map.Entry memberEntry : groupSpec.members().entrySet()) { + for (final Map.Entry memberEntry : groupSpec.members().entrySet()) { final String memberId = memberEntry.getKey(); final String processId = memberEntry.getValue().processId(); final Member member = new Member(processId, memberId); @@ -103,19 +105,19 @@ public class StickyTaskAssignor implements TaskAssignor { localState.processIdToState.get(processId).addMember(memberId); // prev active tasks - for (Map.Entry> entry : memberSpec.activeTasks().entrySet()) { - Set partitionNoSet = entry.getValue(); - for (int partitionNo : partitionNoSet) { + for (final Map.Entry> entry : memberSpec.activeTasks().entrySet()) { + final Set partitionNoSet = entry.getValue(); + for (final int partitionNo : partitionNoSet) { localState.activeTaskToPrevMember.put(new TaskId(entry.getKey(), partitionNo), member); } } // prev standby tasks - for (Map.Entry> entry : memberSpec.standbyTasks().entrySet()) { - Set partitionNoSet = entry.getValue(); - for (int partitionNo : partitionNoSet) { - TaskId taskId = new TaskId(entry.getKey(), partitionNo); - localState.standbyTaskToPrevMember.putIfAbsent(taskId, new HashSet<>()); + for (final Map.Entry> entry : memberSpec.standbyTasks().entrySet()) { + final Set partitionNoSet = entry.getValue(); + for (final int partitionNo : partitionNoSet) { + final TaskId taskId = new TaskId(entry.getKey(), partitionNo); + localState.standbyTaskToPrevMember.putIfAbsent(taskId, new ArrayList<>()); localState.standbyTaskToPrevMember.get(taskId).add(member); } } @@ -139,15 +141,15 @@ public class StickyTaskAssignor implements TaskAssignor { return set1; })); - for (String memberId : members) { - Map> activeTasks = new HashMap<>(); + for (final String memberId : members) { + final Map> activeTasks = new HashMap<>(); if (activeTasksAssignments.containsKey(memberId)) { - activeTasks = toCompactedTaskIds(activeTasksAssignments.get(memberId)); + activeTasks.putAll(toCompactedTaskIds(activeTasksAssignments.get(memberId))); } - Map> standByTasks = new HashMap<>(); + final Map> standByTasks = new HashMap<>(); if (standbyTasksAssignments.containsKey(memberId)) { - standByTasks = toCompactedTaskIds(standbyTasksAssignments.get(memberId)); + standByTasks.putAll(toCompactedTaskIds(standbyTasksAssignments.get(memberId))); } memberAssignments.put(memberId, new MemberAssignment(activeTasks, standByTasks, new HashMap<>())); } @@ -156,8 +158,8 @@ public class StickyTaskAssignor implements TaskAssignor { } private Map> toCompactedTaskIds(final Set taskIds) { - Map> ret = new HashMap<>(); - for (TaskId taskId : taskIds) { + final Map> ret = new HashMap<>(); + for (final TaskId taskId : taskIds) { ret.putIfAbsent(taskId.subtopologyId(), new HashSet<>()); ret.get(taskId.subtopologyId()).add(taskId.partition()); } @@ -167,42 +169,47 @@ public class StickyTaskAssignor implements TaskAssignor { private void assignActive(final Set activeTasks) { // 1. re-assigning existing active tasks to clients that previously had the same active tasks - for (Iterator it = activeTasks.iterator(); it.hasNext();) { + for (final Iterator it = activeTasks.iterator(); it.hasNext();) { final TaskId task = it.next(); final Member prevMember = localState.activeTaskToPrevMember.get(task); if (prevMember != null && hasUnfulfilledQuota(prevMember)) { - localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); - updateHelpers(prevMember, true); + final ProcessState processState = localState.processIdToState.get(prevMember.processId); + processState.addTask(prevMember.memberId, task, true); + maybeUpdateTasksPerMember(processState.activeTaskCount()); it.remove(); } } // 2. re-assigning tasks to clients that previously have seen the same task (as standby task) - for (Iterator it = activeTasks.iterator(); it.hasNext();) { + for (final Iterator it = activeTasks.iterator(); it.hasNext();) { final TaskId task = it.next(); - final Set prevMembers = localState.standbyTaskToPrevMember.get(task); - final Member prevMember = findMemberWithLeastLoad(prevMembers, task, true); + final ArrayList prevMembers = localState.standbyTaskToPrevMember.get(task); + final Member prevMember = findPrevMemberWithLeastLoad(prevMembers, null); if (prevMember != null && hasUnfulfilledQuota(prevMember)) { - localState.processIdToState.get(prevMember.processId).addTask(prevMember.memberId, task, true); - updateHelpers(prevMember, true); + final ProcessState processState = localState.processIdToState.get(prevMember.processId); + processState.addTask(prevMember.memberId, task, true); + maybeUpdateTasksPerMember(processState.activeTaskCount()); it.remove(); } } // 3. assign any remaining unassigned tasks - for (Iterator it = activeTasks.iterator(); it.hasNext();) { + final PriorityQueue processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load)); + processByLoad.addAll(localState.processIdToState.values()); + for (final Iterator it = activeTasks.iterator(); it.hasNext();) { final TaskId task = it.next(); - final Set allMembers = localState.processIdToState.entrySet().stream().flatMap(entry -> entry.getValue().memberToTaskCounts().keySet().stream() - .map(memberId -> new Member(entry.getKey(), memberId))).collect(Collectors.toSet()); - final Member member = findMemberWithLeastLoad(allMembers, task, false); + final ProcessState processWithLeastLoad = processByLoad.poll(); + if (processWithLeastLoad == null) { + throw new TaskAssignorException("No process available to assign active task {}." + task); + } + final String member = memberWithLeastLoad(processWithLeastLoad); if (member == null) { - log.error("Unable to assign active task {} to any member.", task); throw new TaskAssignorException("No member available to assign active task {}." + task); } - localState.processIdToState.get(member.processId).addTask(member.memberId, task, true); + processWithLeastLoad.addTask(member, task, true); it.remove(); - updateHelpers(member, true); - + maybeUpdateTasksPerMember(processWithLeastLoad.activeTaskCount()); + processByLoad.add(processWithLeastLoad); // Add it back to the queue after updating its state } } @@ -214,29 +221,75 @@ public class StickyTaskAssignor implements TaskAssignor { } } - private Member findMemberWithLeastLoad(final Set members, TaskId taskId, final boolean returnSameMember) { + private boolean assignStandbyToMemberWithLeastLoad(PriorityQueue queue, TaskId taskId) { + final ProcessState processWithLeastLoad = queue.poll(); + if (processWithLeastLoad == null) { + return false; + } + boolean found = false; + if (!processWithLeastLoad.hasTask(taskId)) { + final String memberId = memberWithLeastLoad(processWithLeastLoad); + if (memberId != null) { + processWithLeastLoad.addTask(memberId, taskId, false); + found = true; + } + } else if (!queue.isEmpty()) { + found = assignStandbyToMemberWithLeastLoad(queue, taskId); + } + queue.add(processWithLeastLoad); // Add it back to the queue after updating its state + return found; + } + + /** + * Finds the previous member with the least load for a given task. + * + * @param members The list of previous members owning the task. + * @param taskId The taskId, to check if the previous member already has the task. Can be null, if we assign it + * for the first time (e.g., during active task assignment). + * + * @return Previous member with the least load that deoes not have the task, or null if no such member exists. + */ + private Member findPrevMemberWithLeastLoad(final ArrayList members, final TaskId taskId) { if (members == null || members.isEmpty()) { return null; } - Optional processWithLeastLoad = members.stream() - .map(member -> localState.processIdToState.get(member.processId)) - .min(Comparator.comparingDouble(ProcessState::load)); - // if the same exact former member is needed - if (returnSameMember) { - return localState.standbyTaskToPrevMember.get(taskId).stream() - .filter(standby -> standby.processId.equals(processWithLeastLoad.get().processId())) - .findFirst() - .orElseGet(() -> memberWithLeastLoad(processWithLeastLoad.get())); + Member candidate = members.get(0); + final ProcessState candidateProcessState = localState.processIdToState.get(candidate.processId); + double candidateProcessLoad = candidateProcessState.load(); + double candidateMemberLoad = candidateProcessState.memberToTaskCounts().get(candidate.memberId); + for (int i = 1; i < members.size(); i++) { + final Member member = members.get(i); + final ProcessState processState = localState.processIdToState.get(member.processId); + final double newProcessLoad = processState.load(); + if (newProcessLoad < candidateProcessLoad && (taskId == null || !processState.hasTask(taskId))) { + final double newMemberLoad = processState.memberToTaskCounts().get(member.memberId); + if (newMemberLoad < candidateMemberLoad) { + candidateProcessLoad = newProcessLoad; + candidateMemberLoad = newMemberLoad; + candidate = member; + } + } } - return memberWithLeastLoad(processWithLeastLoad.get()); + + if (taskId == null || !candidateProcessState.hasTask(taskId)) { + return candidate; + } + return null; } - private Member memberWithLeastLoad(final ProcessState processWithLeastLoad) { - Optional memberWithLeastLoad = processWithLeastLoad.memberToTaskCounts().entrySet().stream() + private String memberWithLeastLoad(final ProcessState processWithLeastLoad) { + final Map members = processWithLeastLoad.memberToTaskCounts(); + if (members.isEmpty()) { + return null; + } + if (members.size() == 1) { + return members.keySet().iterator().next(); + } + final Optional memberWithLeastLoad = processWithLeastLoad.memberToTaskCounts().entrySet().stream() .min(Map.Entry.comparingByValue()) .map(Map.Entry::getKey); - return memberWithLeastLoad.map(memberId -> new Member(processWithLeastLoad.processId(), memberId)).orElse(null); + return memberWithLeastLoad.orElse(null); } private boolean hasUnfulfilledQuota(final Member member) { @@ -244,55 +297,49 @@ public class StickyTaskAssignor implements TaskAssignor { } private void assignStandby(final Set standbyTasks, final int numStandbyReplicas) { - for (TaskId task : standbyTasks) { + final ArrayList toLeastLoaded = new ArrayList<>(standbyTasks.size() * numStandbyReplicas); + for (final TaskId task : standbyTasks) { for (int i = 0; i < numStandbyReplicas; i++) { - final Set availableProcesses = localState.processIdToState.values().stream() - .filter(process -> !process.hasTask(task)) - .map(ProcessState::processId) - .collect(Collectors.toSet()); - - if (availableProcesses.isEmpty()) { - log.warn("{} There is not enough available capacity. " + - "You should increase the number of threads and/or application instances to maintain the requested number of standby replicas.", - errorMessage(numStandbyReplicas, i, task)); - break; - } - Member standby = null; - // prev active task - Member prevMember = localState.activeTaskToPrevMember.get(task); - if (prevMember != null && availableProcesses.contains(prevMember.processId) && isLoadBalanced(prevMember.processId)) { - standby = prevMember; + final Member prevMember = localState.activeTaskToPrevMember.get(task); + if (prevMember != null) { + final ProcessState prevMemberProcessState = localState.processIdToState.get(prevMember.processId); + if (!prevMemberProcessState.hasTask(task) && isLoadBalanced(prevMemberProcessState)) { + prevMemberProcessState.addTask(prevMember.memberId, task, false); + continue; + } } // prev standby tasks - if (standby == null) { - final Set prevMembers = localState.standbyTaskToPrevMember.get(task); - if (prevMembers != null && !prevMembers.isEmpty()) { - prevMembers.removeIf(member -> !availableProcesses.contains(member.processId)); - prevMember = findMemberWithLeastLoad(prevMembers, task, true); - if (prevMember != null && isLoadBalanced(prevMember.processId)) { - standby = prevMember; + final ArrayList prevMembers = localState.standbyTaskToPrevMember.get(task); + if (prevMembers != null && !prevMembers.isEmpty()) { + final Member prevMember2 = findPrevMemberWithLeastLoad(prevMembers, task); + if (prevMember2 != null) { + final ProcessState prevMemberProcessState = localState.processIdToState.get(prevMember2.processId); + if (isLoadBalanced(prevMemberProcessState)) { + prevMemberProcessState.addTask(prevMember2.memberId, task, false); + continue; } } } - // others - if (standby == null) { - final Set availableMembers = availableProcesses.stream() - .flatMap(pId -> localState.processIdToState.get(pId).memberToTaskCounts().keySet().stream() - .map(mId -> new Member(pId, mId))).collect(Collectors.toSet()); - standby = findMemberWithLeastLoad(availableMembers, task, false); - if (standby == null) { - log.warn("{} Error in standby task assignment!", errorMessage(numStandbyReplicas, i, task)); - break; - } - } - localState.processIdToState.get(standby.processId).addTask(standby.memberId, task, false); - updateHelpers(standby, false); + toLeastLoaded.add(new StandbyToAssign(task, numStandbyReplicas - i)); + break; } + } + final PriorityQueue processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load)); + processByLoad.addAll(localState.processIdToState.values()); + for (final StandbyToAssign toAssign : toLeastLoaded) { + for (int i = 0; i < toAssign.remainingReplicas; i++) { + if (!assignStandbyToMemberWithLeastLoad(processByLoad, toAssign.taskId)) { + log.warn("{} There is not enough available capacity. " + + "You should increase the number of threads and/or application instances to maintain the requested number of standby replicas.", + errorMessage(numStandbyReplicas, i, toAssign.taskId)); + break; + } + } } } @@ -301,21 +348,13 @@ public class StickyTaskAssignor implements TaskAssignor { " of " + numStandbyReplicas + " standby tasks for task [" + task + "]."; } - private boolean isLoadBalanced(final String processId) { - final ProcessState process = localState.processIdToState.get(processId); + private boolean isLoadBalanced(final ProcessState process) { final double load = process.load(); - boolean isLeastLoadedProcess = localState.processIdToState.values().stream() + final boolean isLeastLoadedProcess = localState.processIdToState.values().stream() .allMatch(p -> p.load() >= load); return process.hasCapacity() || isLeastLoadedProcess; } - private void updateHelpers(final Member member, final boolean isActive) { - if (isActive) { - // update task per process - maybeUpdateTasksPerMember(localState.processIdToState.get(member.processId).activeTaskCount()); - } - } - private static int computeTasksPerMember(final int numberOfTasks, final int numberOfMembers) { if (numberOfMembers == 0) { return 0; @@ -326,7 +365,17 @@ public class StickyTaskAssignor implements TaskAssignor { } return tasksPerMember; } - + + static class StandbyToAssign { + private final TaskId taskId; + private final int remainingReplicas; + + public StandbyToAssign(final TaskId taskId, final int remainingReplicas) { + this.taskId = taskId; + this.remainingReplicas = remainingReplicas; + } + } + static class Member { private final String processId; private final String memberId; @@ -340,11 +389,11 @@ public class StickyTaskAssignor implements TaskAssignor { private static class LocalState { // helper data structures: Map activeTaskToPrevMember; - Map> standbyTaskToPrevMember; + Map> standbyTaskToPrevMember; Map processIdToState; int allTasks; int totalCapacity; int tasksPerMember; } -} \ No newline at end of file +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java index 5d26f9a9e12..b4fa9c4db99 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java @@ -833,6 +833,265 @@ public class StickyTaskAssignorTest { assertEquals(2, getAllActiveTaskIds(result, "newMember").size()); } + @Test + public void shouldHandleLargeNumberOfTasksWithStandbyAssignment() { + final int numTasks = 100; + final int numClients = 5; + final int numStandbyReplicas = 2; + + Map members = new HashMap<>(); + for (int i = 0; i < numClients; i++) { + members.put("member" + i, createAssignmentMemberSpec("process" + i)); + } + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology")) + ); + + // Verify all active tasks are assigned + Set allActiveTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberActiveTasks = getAllActiveTaskIds(result, memberId); + allActiveTasks.addAll(memberActiveTasks); + } + assertEquals(numTasks, allActiveTasks.size()); + + // Verify standby tasks are assigned (should be numTasks * numStandbyReplicas total) + Set allStandbyTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberStandbyTasks = getAllStandbyTaskIds(result, memberId); + allStandbyTasks.addAll(memberStandbyTasks); + } + // With 5 clients and 2 standby replicas, we should have at least some standby tasks + assertTrue(allStandbyTasks.size() > 0, "Should have some standby tasks assigned"); + // Maximum possible = numTasks * min(numStandbyReplicas, numClients - 1) = 100 * 2 = 200 + int maxPossibleStandbyTasks = numTasks * Math.min(numStandbyReplicas, numClients - 1); + assertTrue(allStandbyTasks.size() <= maxPossibleStandbyTasks, + "Should not exceed maximum possible standby tasks: " + maxPossibleStandbyTasks); + + // Verify no client has both active and standby for the same task + for (String memberId : result.members().keySet()) { + Set memberActiveTasks = new HashSet<>(getAllActiveTaskIds(result, memberId)); + Set memberStandbyTasks = new HashSet<>(getAllStandbyTaskIds(result, memberId)); + memberActiveTasks.retainAll(memberStandbyTasks); + assertTrue(memberActiveTasks.isEmpty(), "Client " + memberId + " has both active and standby for same task"); + } + + // Verify load distribution is reasonable + int minActiveTasks = Integer.MAX_VALUE; + int maxActiveTasks = 0; + for (String memberId : result.members().keySet()) { + int activeTaskCount = getAllActiveTaskCount(result, memberId); + minActiveTasks = Math.min(minActiveTasks, activeTaskCount); + maxActiveTasks = Math.max(maxActiveTasks, activeTaskCount); + } + // With 100 tasks and 5 clients, each should have 20 tasks + assertEquals(20, minActiveTasks); + assertEquals(20, maxActiveTasks); + + // Verify standby task distribution is reasonable + int minStandbyTasks = Integer.MAX_VALUE; + int maxStandbyTasks = 0; + for (String memberId : result.members().keySet()) { + int standbyTaskCount = getAllStandbyTaskIds(result, memberId).size(); + minStandbyTasks = Math.min(minStandbyTasks, standbyTaskCount); + maxStandbyTasks = Math.max(maxStandbyTasks, standbyTaskCount); + } + // Each client should have some standby tasks, but not necessarily equal distribution + assertTrue(minStandbyTasks >= 0); + assertTrue(maxStandbyTasks > 0); + } + + @Test + public void shouldHandleOddNumberOfClientsWithStandbyTasks() { + // Test with odd number of clients (7) and even number of tasks (14) + final int numTasks = 14; + final int numClients = 7; + final int numStandbyReplicas = 1; + + Map members = new HashMap<>(); + for (int i = 0; i < numClients; i++) { + members.put("member" + i, createAssignmentMemberSpec("process" + i)); + } + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology")) + ); + + // Verify all active tasks are assigned + Set allActiveTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberActiveTasks = getAllActiveTaskIds(result, memberId); + allActiveTasks.addAll(memberActiveTasks); + } + assertEquals(numTasks, allActiveTasks.size()); + + // Verify standby tasks are assigned + Set allStandbyTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberStandbyTasks = getAllStandbyTaskIds(result, memberId); + allStandbyTasks.addAll(memberStandbyTasks); + } + assertEquals(numTasks * numStandbyReplicas, allStandbyTasks.size()); + + // With 14 tasks and 7 clients, each client should have 2 active tasks + int expectedTasksPerClient = numTasks / numClients; // 14 / 7 = 2 + int remainder = numTasks % numClients; // 14 % 7 = 0 + + int clientsWithExpectedTasks = 0; + int clientsWithOneMoreTask = 0; + for (String memberId : result.members().keySet()) { + int activeTaskCount = getAllActiveTaskCount(result, memberId); + if (activeTaskCount == expectedTasksPerClient) { + clientsWithExpectedTasks++; + } else if (activeTaskCount == expectedTasksPerClient + 1) { + clientsWithOneMoreTask++; + } + } + assertEquals(numClients - remainder, clientsWithExpectedTasks); // 7 clients should have 2 tasks + assertEquals(remainder, clientsWithOneMoreTask); // 0 clients should have 3 tasks + } + + @Test + public void shouldHandleHighStandbyReplicaCount() { + // Test with high number of standby replicas (5) and limited clients (3) + final int numTasks = 6; + final int numClients = 3; + final int numStandbyReplicas = 5; + + Map members = new HashMap<>(); + for (int i = 0; i < numClients; i++) { + members.put("member" + i, createAssignmentMemberSpec("process" + i)); + } + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology")) + ); + + // Verify all active tasks are assigned + Set allActiveTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberActiveTasks = getAllActiveTaskIds(result, memberId); + allActiveTasks.addAll(memberActiveTasks); + } + assertEquals(numTasks, allActiveTasks.size()); + + // With only 3 clients and 5 standby replicas, not all standby replicas can be assigned + // since each client can only hold standby tasks for tasks it doesn't have active + Set allStandbyTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberStandbyTasks = getAllStandbyTaskIds(result, memberId); + allStandbyTasks.addAll(memberStandbyTasks); + } + + // Maximum possible = numTasks * min(numStandbyReplicas, numClients - 1) = 6 * 2 = 12 + int maxPossibleStandbyTasks = numTasks * Math.min(numStandbyReplicas, numClients - 1); + assertTrue(allStandbyTasks.size() <= maxPossibleStandbyTasks); + assertTrue(allStandbyTasks.size() > 0); // Should assign at least some standby tasks + } + + @Test + public void shouldHandleLargeNumberOfSubtopologiesWithStandbyTasks() { + // Test with many subtopologies (10) each with different numbers of tasks + final int numSubtopologies = 10; + final int numClients = 4; + final int numStandbyReplicas = 1; + + List subtopologies = new ArrayList<>(); + for (int i = 0; i < numSubtopologies; i++) { + subtopologies.add("subtopology-" + i); + } + + Map members = new HashMap<>(); + for (int i = 0; i < numClients; i++) { + members.put("member" + i, createAssignmentMemberSpec("process" + i)); + } + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(5, true, subtopologies) // 5 tasks per subtopology + ); + + // Verify all subtopologies have tasks assigned + Set subtopologiesWithTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + MemberAssignment member = result.members().get(memberId); + subtopologiesWithTasks.addAll(member.activeTasks().keySet()); + } + assertEquals(numSubtopologies, subtopologiesWithTasks.size()); + + // Verify standby tasks are assigned across subtopologies + Set subtopologiesWithStandbyTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + MemberAssignment member = result.members().get(memberId); + subtopologiesWithStandbyTasks.addAll(member.standbyTasks().keySet()); + } + assertEquals(numSubtopologies, subtopologiesWithStandbyTasks.size()); + } + + @Test + public void shouldHandleEdgeCaseWithSingleClientAndMultipleStandbyReplicas() { + // Test edge case: single client with multiple standby replicas + final int numTasks = 10; + final int numStandbyReplicas = 3; + + Map members = mkMap( + mkEntry("member1", createAssignmentMemberSpec("process1")) + ); + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology")) + ); + + // Single client should get all active tasks + assertEquals(numTasks, getAllActiveTaskCount(result, "member1")); + + // No standby tasks should be assigned since there's only one client + // (standby tasks can't be assigned to the same client as active tasks) + assertTrue(getAllStandbyTaskIds(result, "member1").isEmpty()); + } + + @Test + public void shouldHandleEdgeCaseWithMoreStandbyReplicasThanAvailableClients() { + // Test edge case: more standby replicas than available clients + final int numTasks = 4; + final int numClients = 2; + final int numStandbyReplicas = 5; // More than available clients + + Map members = new HashMap<>(); + for (int i = 0; i < numClients; i++) { + members.put("member" + i, createAssignmentMemberSpec("process" + i)); + } + + GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)))), + new TopologyDescriberImpl(numTasks, true, List.of("test-subtopology")) + ); + + // Verify all active tasks are assigned + Set allActiveTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberActiveTasks = getAllActiveTaskIds(result, memberId); + allActiveTasks.addAll(memberActiveTasks); + } + assertEquals(numTasks, allActiveTasks.size()); + + // With only 2 clients, maximum standby tasks per task = 1 (since each client has active tasks) + Set allStandbyTasks = new HashSet<>(); + for (String memberId : result.members().keySet()) { + List memberStandbyTasks = getAllStandbyTaskIds(result, memberId); + allStandbyTasks.addAll(memberStandbyTasks); + } + + // Maximum possible = numTasks * 1 = 4 + assertEquals(numTasks, allStandbyTasks.size()); + } + + private int getAllActiveTaskCount(GroupAssignment result, String... memberIds) { int size = 0; for (String memberId : memberIds) { diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index fbd9a3a14b1..7370d488757 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -573,7 +573,7 @@ public class RestoreIntegrationTest { createStateForRestoration(inputStream, 0); if (useNewProtocol) { - CLUSTER.setStandbyReplicas(appId, 1); + CLUSTER.setGroupStandbyReplicas(appId, 1); } final Properties props1 = props(stateUpdaterEnabled); diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index c40b3433a91..d9df2077b2c 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -140,15 +140,19 @@ public class SmokeTestDriverIntegrationTest { final Properties props = new Properties(); + final String appId = safeUniqueTestName(testInfo); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueTestName(testInfo)); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); props.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); - // decrease the session timeout so that we can trigger the rebalance soon after old client left closed - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); - props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 500); if (streamsProtocolEnabled) { props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + // decrease the session timeout so that we can trigger the rebalance soon after old client left closed + CLUSTER.setGroupSessionTimeout(appId, 10000); + CLUSTER.setGroupHeartbeatTimeout(appId, 1000); + } else { + // decrease the session timeout so that we can trigger the rebalance soon after old client left closed + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); } // cycle out Streams instances as long as the test is running. diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java index ec48b8b3634..8c8ef3dae9c 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -99,7 +99,7 @@ public class StandbyTaskCreationIntegrationTest { streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); if (streamsProtocolEnabled) { streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); - CLUSTER.setStandbyReplicas("app-" + safeTestName, 1); + CLUSTER.setGroupStandbyReplicas("app-" + safeTestName, 1); } else { streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index f425f8365ee..1de7a45bfce 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -401,6 +401,8 @@ public class EmbeddedKafkaCluster { private void addDefaultBrokerPropsIfAbsent(final Properties brokerConfig) { brokerConfig.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L); + brokerConfig.putIfAbsent(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "100"); + brokerConfig.putIfAbsent(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "100"); brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "0"); brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0"); brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5"); @@ -439,7 +441,33 @@ public class EmbeddedKafkaCluster { } } - public void setStandbyReplicas(final String groupId, final int numStandbyReplicas) { + public void setGroupSessionTimeout(final String groupId, final int sessionTimeoutMs) { + try (final Admin adminClient = createAdminClient()) { + adminClient.incrementalAlterConfigs( + Map.of( + new ConfigResource(ConfigResource.Type.GROUP, groupId), + List.of(new AlterConfigOp(new ConfigEntry(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, String.valueOf(sessionTimeoutMs)), AlterConfigOp.OpType.SET)) + ) + ).all().get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + public void setGroupHeartbeatTimeout(final String groupId, final int heartbeatTimeoutMs) { + try (final Admin adminClient = createAdminClient()) { + adminClient.incrementalAlterConfigs( + Map.of( + new ConfigResource(ConfigResource.Type.GROUP, groupId), + List.of(new AlterConfigOp(new ConfigEntry(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(heartbeatTimeoutMs)), AlterConfigOp.OpType.SET)) + ) + ).all().get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + public void setGroupStandbyReplicas(final String groupId, final int numStandbyReplicas) { try (final Admin adminClient = createAdminClient()) { adminClient.incrementalAlterConfigs( Map.of(