From a1b01f48e9ef06291f3e052a37bde887b891e708 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Tue, 6 Mar 2018 00:24:22 -0500 Subject: [PATCH] KAFKA-6309: Improve task assignor load balance (#4624) Sorts TaskIds on first assignment evenly distributing tasks by topicGroupId should help with evening the load of work across topologies. This PR is an initial "strawman" approach which will be followed up (at a later date YTBD) by scoring or assigning weight to processing nodes to ensure even processing distribution. Added a new test to existing unit test. --- .../assignment/StickyTaskAssignor.java | 10 ++-- .../assignment/StickyTaskAssignorTest.java | 55 +++++++++++++++++++ 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index de8fa57e36a..5b54d08c032 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -20,10 +20,13 @@ import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -106,14 +109,13 @@ public class StickyTaskAssignor implements TaskAssignor { } // assign any remaining unassigned tasks - for (final TaskId taskId : unassigned) { + List sortedTasks = new ArrayList<>(unassigned); + Collections.sort(sortedTasks); + for (final TaskId taskId : sortedTasks) { allocateTaskWithClientCandidates(taskId, clients.keySet(), true); } - } - - private void allocateTaskWithClientCandidates(final TaskId taskId, final Set clientsWithin, final boolean active) { final ClientState client = findClient(taskId, clientsWithin, active); taskPairs.addPairs(taskId, client.assignedTasks()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 4f770c86239..ed22e3c30de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -23,11 +23,13 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -350,6 +352,42 @@ public class StickyTaskAssignorTest { assertThat(clients.get(p1).assignedTaskCount(), equalTo(4)); } + @Test + public void shouldEvenlyDistributeByTaskIdAndPartition() { + createClient(p1, 4); + createClient(p2, 4); + createClient(p3, 4); + createClient(p4, 4); + + final List taskIds = new ArrayList<>(); + final TaskId[] taskIdArray = new TaskId[16]; + + for (int i = 1; i <= 2; i++) { + for (int j = 0; j < 8; j++) { + taskIds.add(new TaskId(i, j)); + } + } + + Collections.shuffle(taskIds); + taskIds.toArray(taskIdArray); + + final StickyTaskAssignor taskAssignor = createTaskAssignor(taskIdArray); + taskAssignor.assign(0); + + Collections.sort(taskIds); + final Set expectedClientOneAssignment = getExpectedTaskIdAssignment(taskIds, 0, 4, 8, 12); + final Set expectedClientTwoAssignment = getExpectedTaskIdAssignment(taskIds, 1, 5, 9, 13); + final Set expectedClientThreeAssignment = getExpectedTaskIdAssignment(taskIds, 2, 6, 10, 14); + final Set expectedClientFourAssignment = getExpectedTaskIdAssignment(taskIds, 3, 7, 11, 15); + + final Map> sortedAssignments = sortClientAssignments(clients); + + assertThat(sortedAssignments.get(p1), equalTo(expectedClientOneAssignment)); + assertThat(sortedAssignments.get(p2), equalTo(expectedClientTwoAssignment)); + assertThat(sortedAssignments.get(p3), equalTo(expectedClientThreeAssignment)); + assertThat(sortedAssignments.get(p4), equalTo(expectedClientFourAssignment)); + } + @Test public void shouldNotHaveSameAssignmentOnAnyTwoHosts() { @@ -665,4 +703,21 @@ public class StickyTaskAssignorTest { } } + private Map> sortClientAssignments(final Map clients) { + final Map> sortedAssignments = new HashMap<>(); + for (final Map.Entry entry : clients.entrySet()) { + final Set sorted = new TreeSet<>(entry.getValue().activeTasks()); + sortedAssignments.put(entry.getKey(), sorted); + } + return sortedAssignments; + } + + private Set getExpectedTaskIdAssignment(final List tasks, final int... indices) { + final Set sortedAssignment = new TreeSet<>(); + for (final int index : indices) { + sortedAssignment.add(tasks.get(index)); + } + return sortedAssignment; + } + }