diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index de8fa57e36a..5b54d08c032 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -20,10 +20,13 @@ import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -106,14 +109,13 @@ public class StickyTaskAssignor implements TaskAssignor { } // assign any remaining unassigned tasks - for (final TaskId taskId : unassigned) { + List sortedTasks = new ArrayList<>(unassigned); + Collections.sort(sortedTasks); + for (final TaskId taskId : sortedTasks) { allocateTaskWithClientCandidates(taskId, clients.keySet(), true); } - } - - private void allocateTaskWithClientCandidates(final TaskId taskId, final Set clientsWithin, final boolean active) { final ClientState client = findClient(taskId, clientsWithin, active); taskPairs.addPairs(taskId, client.assignedTasks()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 4f770c86239..ed22e3c30de 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -23,11 +23,13 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -350,6 +352,42 @@ public class StickyTaskAssignorTest { assertThat(clients.get(p1).assignedTaskCount(), equalTo(4)); } + @Test + public void shouldEvenlyDistributeByTaskIdAndPartition() { + createClient(p1, 4); + createClient(p2, 4); + createClient(p3, 4); + createClient(p4, 4); + + final List taskIds = new ArrayList<>(); + final TaskId[] taskIdArray = new TaskId[16]; + + for (int i = 1; i <= 2; i++) { + for (int j = 0; j < 8; j++) { + taskIds.add(new TaskId(i, j)); + } + } + + Collections.shuffle(taskIds); + taskIds.toArray(taskIdArray); + + final StickyTaskAssignor taskAssignor = createTaskAssignor(taskIdArray); + taskAssignor.assign(0); + + Collections.sort(taskIds); + final Set expectedClientOneAssignment = getExpectedTaskIdAssignment(taskIds, 0, 4, 8, 12); + final Set expectedClientTwoAssignment = getExpectedTaskIdAssignment(taskIds, 1, 5, 9, 13); + final Set expectedClientThreeAssignment = getExpectedTaskIdAssignment(taskIds, 2, 6, 10, 14); + final Set expectedClientFourAssignment = getExpectedTaskIdAssignment(taskIds, 3, 7, 11, 15); + + final Map> sortedAssignments = sortClientAssignments(clients); + + assertThat(sortedAssignments.get(p1), equalTo(expectedClientOneAssignment)); + assertThat(sortedAssignments.get(p2), equalTo(expectedClientTwoAssignment)); + assertThat(sortedAssignments.get(p3), equalTo(expectedClientThreeAssignment)); + assertThat(sortedAssignments.get(p4), equalTo(expectedClientFourAssignment)); + } + @Test public void shouldNotHaveSameAssignmentOnAnyTwoHosts() { @@ -665,4 +703,21 @@ public class StickyTaskAssignorTest { } } + private Map> sortClientAssignments(final Map clients) { + final Map> sortedAssignments = new HashMap<>(); + for (final Map.Entry entry : clients.entrySet()) { + final Set sorted = new TreeSet<>(entry.getValue().activeTasks()); + sortedAssignments.put(entry.getKey(), sorted); + } + return sortedAssignments; + } + + private Set getExpectedTaskIdAssignment(final List tasks, final int... indices) { + final Set sortedAssignment = new TreeSet<>(); + for (final int index : indices) { + sortedAssignment.add(tasks.get(index)); + } + return sortedAssignment; + } + }