KAFKA-6309: Improve task assignor load balance (#4624)

Sorts TaskIds on first assignment evenly distributing tasks by topicGroupId should help with evening the load of work across topologies. This PR is an initial "strawman" approach which will be followed up (at a later date YTBD) by scoring or assigning weight to processing nodes to ensure even processing distribution.

Added a new test to existing unit test.
This commit is contained in:
Bill Bejeck 2018-03-06 00:24:22 -05:00 committed by Guozhang Wang
parent 8f2c087166
commit a1b01f48e9
2 changed files with 61 additions and 4 deletions

View File

@ -20,10 +20,13 @@ import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@ -106,14 +109,13 @@ public class StickyTaskAssignor<ID> implements TaskAssignor<ID, TaskId> {
}
// assign any remaining unassigned tasks
for (final TaskId taskId : unassigned) {
List<TaskId> sortedTasks = new ArrayList<>(unassigned);
Collections.sort(sortedTasks);
for (final TaskId taskId : sortedTasks) {
allocateTaskWithClientCandidates(taskId, clients.keySet(), true);
}
}
private void allocateTaskWithClientCandidates(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) {
final ClientState client = findClient(taskId, clientsWithin, active);
taskPairs.addPairs(taskId, client.assignedTasks());

View File

@ -23,11 +23,13 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@ -350,6 +352,42 @@ public class StickyTaskAssignorTest {
assertThat(clients.get(p1).assignedTaskCount(), equalTo(4));
}
@Test
public void shouldEvenlyDistributeByTaskIdAndPartition() {
createClient(p1, 4);
createClient(p2, 4);
createClient(p3, 4);
createClient(p4, 4);
final List<TaskId> taskIds = new ArrayList<>();
final TaskId[] taskIdArray = new TaskId[16];
for (int i = 1; i <= 2; i++) {
for (int j = 0; j < 8; j++) {
taskIds.add(new TaskId(i, j));
}
}
Collections.shuffle(taskIds);
taskIds.toArray(taskIdArray);
final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(taskIdArray);
taskAssignor.assign(0);
Collections.sort(taskIds);
final Set<TaskId> expectedClientOneAssignment = getExpectedTaskIdAssignment(taskIds, 0, 4, 8, 12);
final Set<TaskId> expectedClientTwoAssignment = getExpectedTaskIdAssignment(taskIds, 1, 5, 9, 13);
final Set<TaskId> expectedClientThreeAssignment = getExpectedTaskIdAssignment(taskIds, 2, 6, 10, 14);
final Set<TaskId> expectedClientFourAssignment = getExpectedTaskIdAssignment(taskIds, 3, 7, 11, 15);
final Map<Integer, Set<TaskId>> sortedAssignments = sortClientAssignments(clients);
assertThat(sortedAssignments.get(p1), equalTo(expectedClientOneAssignment));
assertThat(sortedAssignments.get(p2), equalTo(expectedClientTwoAssignment));
assertThat(sortedAssignments.get(p3), equalTo(expectedClientThreeAssignment));
assertThat(sortedAssignments.get(p4), equalTo(expectedClientFourAssignment));
}
@Test
public void shouldNotHaveSameAssignmentOnAnyTwoHosts() {
@ -665,4 +703,21 @@ public class StickyTaskAssignorTest {
}
}
private Map<Integer, Set<TaskId>> sortClientAssignments(final Map<Integer, ClientState> clients) {
final Map<Integer, Set<TaskId>> sortedAssignments = new HashMap<>();
for (final Map.Entry<Integer, ClientState> entry : clients.entrySet()) {
final Set<TaskId> sorted = new TreeSet<>(entry.getValue().activeTasks());
sortedAssignments.put(entry.getKey(), sorted);
}
return sortedAssignments;
}
private Set<TaskId> getExpectedTaskIdAssignment(final List<TaskId> tasks, final int... indices) {
final Set<TaskId> sortedAssignment = new TreeSet<>();
for (final int index : indices) {
sortedAssignment.add(tasks.get(index));
}
return sortedAssignment;
}
}