diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java index 7e07d307826..6cbbdcf5e37 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java @@ -253,7 +253,7 @@ public class HighAvailabilityTaskAssignorTest { } @Test - public void shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks() { + public void shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithEqualStreamThreadsPerClientAsTasks() { final Set allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2); final Map lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, 9); @@ -683,6 +683,130 @@ public class HighAvailabilityTaskAssignorTest { assertThat(client2.standbyTaskCount(), equalTo(1)); } + @Test + public void shouldDistributeStatelessTasksEvenlyOverClientsWithEqualStreamThreadsPerClientAsTasksAndNoStatefulTasks() { + final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2); + final Set statefulTasks = EMPTY_TASKS; + final Set statelessTasks = new HashSet<>(allTasks); + + final Map taskLags = new HashMap<>(); + final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, 7); + final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 7); + final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 7); + + final Map clientStates = getClientStatesMap(client1, client2, client3); + + final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign( + clientStates, + allTasks, + statefulTasks, + new AssignmentConfigs(0L, 0, 0, 0L) + ); + + assertValidAssignment( + 0, + EMPTY_TASKS, + statelessTasks, + clientStates, + new StringBuilder() + ); + assertBalancedActiveAssignment(clientStates, new StringBuilder()); + assertThat(probingRebalanceNeeded, is(false)); + } + + @Test + public void shouldDistributeStatelessTasksEvenlyOverClientsWithLessStreamThreadsPerClientAsTasksAndNoStatefulTasks() { + final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2); + final Set statefulTasks = EMPTY_TASKS; + final Set statelessTasks = new HashSet<>(allTasks); + + final Map taskLags = new HashMap<>(); + final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, 2); + final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 2); + final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 2); + + final Map clientStates = getClientStatesMap(client1, client2, client3); + + final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign( + clientStates, + allTasks, + statefulTasks, + new AssignmentConfigs(0L, 0, 0, 0L) + ); + + assertValidAssignment( + 0, + EMPTY_TASKS, + statelessTasks, + clientStates, + new StringBuilder() + ); + assertBalancedActiveAssignment(clientStates, new StringBuilder()); + assertThat(probingRebalanceNeeded, is(false)); + } + + @Test + public void shouldDistributeStatelessTasksEvenlyOverClientsWithUnevenlyDistributedStreamThreadsAndNoStatefulTasks() { + final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2); + final Set statefulTasks = EMPTY_TASKS; + final Set statelessTasks = new HashSet<>(allTasks); + + final Map taskLags = new HashMap<>(); + final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, 1); + final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 2); + final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 3); + + final Map clientStates = getClientStatesMap(client1, client2, client3); + + final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign( + clientStates, + allTasks, + statefulTasks, + new AssignmentConfigs(0L, 0, 0, 0L) + ); + + assertValidAssignment( + 0, + EMPTY_TASKS, + statelessTasks, + clientStates, + new StringBuilder() + ); + assertBalancedActiveAssignment(clientStates, new StringBuilder()); + assertThat(probingRebalanceNeeded, is(false)); + } + + @Test + public void shouldDistributeStatelessTasksEvenlyWithPreviousAssignmentAndNoStatefulTasks() { + final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2); + final Set statefulTasks = EMPTY_TASKS; + final Set statelessTasks = new HashSet<>(allTasks); + + final Map taskLags = new HashMap<>(); + final ClientState client1 = new ClientState(statelessTasks, emptySet(), taskLags, 3); + final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 3); + final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 3); + + final Map clientStates = getClientStatesMap(client1, client2, client3); + + final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign( + clientStates, + allTasks, + statefulTasks, + new AssignmentConfigs(0L, 0, 0, 0L) + ); + + assertValidAssignment( + 0, + EMPTY_TASKS, + statelessTasks, + clientStates, + new StringBuilder() + ); + assertBalancedActiveAssignment(clientStates, new StringBuilder()); + assertThat(probingRebalanceNeeded, is(false)); + } + private static void assertHasNoActiveTasks(final ClientState... clients) { for (final ClientState client : clients) { assertThat(client.activeTasks(), is(empty()));