KAFKA-6145: Add unit tests for assignments of only stateless tasks (#8713)

Reviewers: John Roesler <vvcephei@apache.org>
This commit is contained in:
Bruno Cadonna 2020-05-22 22:35:25 +02:00 committed by GitHub
parent 713f305172
commit ec205171e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 125 additions and 1 deletions

View File

@ -253,7 +253,7 @@ public class HighAvailabilityTaskAssignorTest {
}
@Test
public void shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithMoreStreamThreadsThanTasks() {
public void shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithEqualStreamThreadsPerClientAsTasks() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
final Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, 9);
@ -683,6 +683,130 @@ public class HighAvailabilityTaskAssignorTest {
assertThat(client2.standbyTaskCount(), equalTo(1));
}
@Test
public void shouldDistributeStatelessTasksEvenlyOverClientsWithEqualStreamThreadsPerClientAsTasksAndNoStatefulTasks() {
final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2);
final Set<TaskId> statefulTasks = EMPTY_TASKS;
final Set<TaskId> statelessTasks = new HashSet<>(allTasks);
final Map<TaskId, Long> taskLags = new HashMap<>();
final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, 7);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 7);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 7);
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTasks,
statefulTasks,
new AssignmentConfigs(0L, 0, 0, 0L)
);
assertValidAssignment(
0,
EMPTY_TASKS,
statelessTasks,
clientStates,
new StringBuilder()
);
assertBalancedActiveAssignment(clientStates, new StringBuilder());
assertThat(probingRebalanceNeeded, is(false));
}
@Test
public void shouldDistributeStatelessTasksEvenlyOverClientsWithLessStreamThreadsPerClientAsTasksAndNoStatefulTasks() {
final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2);
final Set<TaskId> statefulTasks = EMPTY_TASKS;
final Set<TaskId> statelessTasks = new HashSet<>(allTasks);
final Map<TaskId, Long> taskLags = new HashMap<>();
final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, 2);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 2);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 2);
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTasks,
statefulTasks,
new AssignmentConfigs(0L, 0, 0, 0L)
);
assertValidAssignment(
0,
EMPTY_TASKS,
statelessTasks,
clientStates,
new StringBuilder()
);
assertBalancedActiveAssignment(clientStates, new StringBuilder());
assertThat(probingRebalanceNeeded, is(false));
}
@Test
public void shouldDistributeStatelessTasksEvenlyOverClientsWithUnevenlyDistributedStreamThreadsAndNoStatefulTasks() {
final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2);
final Set<TaskId> statefulTasks = EMPTY_TASKS;
final Set<TaskId> statelessTasks = new HashSet<>(allTasks);
final Map<TaskId, Long> taskLags = new HashMap<>();
final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, 1);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 2);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 3);
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTasks,
statefulTasks,
new AssignmentConfigs(0L, 0, 0, 0L)
);
assertValidAssignment(
0,
EMPTY_TASKS,
statelessTasks,
clientStates,
new StringBuilder()
);
assertBalancedActiveAssignment(clientStates, new StringBuilder());
assertThat(probingRebalanceNeeded, is(false));
}
@Test
public void shouldDistributeStatelessTasksEvenlyWithPreviousAssignmentAndNoStatefulTasks() {
final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2);
final Set<TaskId> statefulTasks = EMPTY_TASKS;
final Set<TaskId> statelessTasks = new HashSet<>(allTasks);
final Map<TaskId, Long> taskLags = new HashMap<>();
final ClientState client1 = new ClientState(statelessTasks, emptySet(), taskLags, 3);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 3);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 3);
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTasks,
statefulTasks,
new AssignmentConfigs(0L, 0, 0, 0L)
);
assertValidAssignment(
0,
EMPTY_TASKS,
statelessTasks,
clientStates,
new StringBuilder()
);
assertBalancedActiveAssignment(clientStates, new StringBuilder());
assertThat(probingRebalanceNeeded, is(false));
}
private static void assertHasNoActiveTasks(final ClientState... clients) {
for (final ClientState client : clients) {
assertThat(client.activeTasks(), is(empty()));