KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization (#16129)

This fills in the implementation details of the standby task assignment utility functions within TaskAssignmentUtils.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Antoine Pourchet 2024-05-30 16:11:33 -06:00 committed by GitHub
parent 7c1bb1585f
commit 370e5ea1f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 543 additions and 84 deletions

View File

@ -16,11 +16,13 @@
*/
package org.apache.kafka.streams.processor.assignment;
import static java.util.Collections.unmodifiableMap;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.TaskId;
@ -31,7 +33,7 @@ import org.apache.kafka.streams.processor.TaskId;
public class KafkaStreamsAssignment {
private final ProcessId processId;
private final Map<TaskId, AssignedTask> assignment;
private final Map<TaskId, AssignedTask> tasks;
private final Optional<Instant> followupRebalanceDeadline;
/**
@ -45,7 +47,8 @@ public class KafkaStreamsAssignment {
* @return a new KafkaStreamsAssignment object with the given processId and assignment
*/
public static KafkaStreamsAssignment of(final ProcessId processId, final Set<AssignedTask> assignment) {
return new KafkaStreamsAssignment(processId, assignment, Optional.empty());
final Map<TaskId, AssignedTask> tasks = assignment.stream().collect(Collectors.toMap(AssignedTask::id, Function.identity()));
return new KafkaStreamsAssignment(processId, tasks, Optional.empty());
}
/**
@ -62,14 +65,14 @@ public class KafkaStreamsAssignment {
* @return a new KafkaStreamsAssignment object with the same processId and assignment but with the given rebalanceDeadline
*/
public KafkaStreamsAssignment withFollowupRebalance(final Instant rebalanceDeadline) {
return new KafkaStreamsAssignment(this.processId(), this.assignment(), Optional.of(rebalanceDeadline));
return new KafkaStreamsAssignment(this.processId(), this.tasks(), Optional.of(rebalanceDeadline));
}
private KafkaStreamsAssignment(final ProcessId processId,
final Set<AssignedTask> assignment,
final Map<TaskId, AssignedTask> tasks,
final Optional<Instant> followupRebalanceDeadline) {
this.processId = processId;
this.assignment = assignment.stream().collect(Collectors.toMap(AssignedTask::id, t -> t));
this.tasks = tasks;
this.followupRebalanceDeadline = followupRebalanceDeadline;
}
@ -83,24 +86,18 @@ public class KafkaStreamsAssignment {
/**
*
* @return a set of assigned tasks that are part of this {@code KafkaStreamsAssignment}
* @return a read-only set of assigned tasks that are part of this {@code KafkaStreamsAssignment}
*/
public Set<AssignedTask> assignment() {
// TODO change assignment to return a map so we aren't forced to copy this into a Set
return new HashSet<>(assignment.values());
}
// TODO: merge this with #assignment by having it return a Map<TaskId, AssignedTask>
public Set<TaskId> assignedTaskIds() {
return assignment.keySet();
public Map<TaskId, AssignedTask> tasks() {
return unmodifiableMap(tasks);
}
public void assignTask(final AssignedTask newTask) {
assignment.put(newTask.id(), newTask);
tasks.put(newTask.id(), newTask);
}
public void removeTask(final AssignedTask removedTask) {
assignment.remove(removedTask.id());
tasks.remove(removedTask.id());
}
/**
@ -140,5 +137,25 @@ public class KafkaStreamsAssignment {
public Type type() {
return taskType;
}
@Override
public int hashCode() {
final int prime = 31;
int result = prime + this.id.hashCode();
result = prime * result + this.type().hashCode();
return result;
}
@Override
public boolean equals(final Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
final AssignedTask other = (AssignedTask) obj;
return this.id.equals(other.id()) && this.taskType == other.taskType;
}
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor.assignment;
import static org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.STANDBY_OPTIMIZER_MAX_ITERATION;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@ -26,9 +28,15 @@ import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
import org.apache.kafka.streams.processor.internals.assignment.ConstrainedPrioritySet;
import org.apache.kafka.streams.processor.internals.assignment.Graph;
import org.apache.kafka.streams.processor.internals.assignment.MinTrafficGraphConstructor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructor;
@ -72,6 +80,27 @@ public final class TaskAssignmentUtils {
return assignments;
}
/**
* Assign standby tasks to KafkaStreams clients according to the default logic.
* <p>
* If rack-aware client tags are configured, the rack-aware standby task assignor will be used
*
* @param applicationState the metadata and other info describing the current application state
* @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
*
* @return a new map containing the mappings from KafkaStreamsAssignments updated with the default standby assignment
*/
public static Map<ProcessId, KafkaStreamsAssignment> defaultStandbyTaskAssignment(final ApplicationState applicationState,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
if (!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {
return tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments);
} else if (canPerformRackAwareOptimization(applicationState, AssignedTask.Type.STANDBY)) {
return tagBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments);
} else {
return loadBasedStandbyTaskAssignment(applicationState, kafkaStreamsAssignments);
}
}
/**
* Optimize active task assignment for rack awareness. This optimization is based on the
* {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG trafficCost}
@ -98,11 +127,9 @@ public final class TaskAssignmentUtils {
*
* @return a map with the KafkaStreamsAssignments updated to minimize cross-rack traffic for active tasks
*/
public static Map<ProcessId, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(
final ApplicationState applicationState,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
final SortedSet<TaskId> tasks
) {
public static Map<ProcessId, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(final ApplicationState applicationState,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
final SortedSet<TaskId> tasks) {
if (tasks.isEmpty()) {
return kafkaStreamsAssignments;
}
@ -131,7 +158,7 @@ public final class TaskAssignmentUtils {
clientRacks.put(uuid, kafkaStreamsStates.get(entry.getKey()).rackId());
}
final long initialCost = computeInitialCost(
final long initialCost = computeTotalAssignmentCost(
topicPartitionsByTaskId,
taskIds,
clientIds,
@ -172,7 +199,7 @@ public final class TaskAssignmentUtils {
assignmentGraph.clientByTask,
(assignment, taskId) -> assignment.assignTask(new AssignedTask(taskId, AssignedTask.Type.ACTIVE)),
(assignment, taskId) -> assignment.removeTask(new AssignedTask(taskId, AssignedTask.Type.ACTIVE)),
(assignment, taskId) -> assignment.assignedTaskIds().contains(taskId)
(assignment, taskId) -> assignment.tasks().containsKey(taskId)
);
return kafkaStreamsAssignments;
@ -231,7 +258,7 @@ public final class TaskAssignmentUtils {
clientRacks.put(uuid, kafkaStreamsStates.get(entry.getKey()).rackId());
}
final long initialCost = computeInitialCost(
final long initialCost = computeTotalAssignmentCost(
topicPartitionsByTaskId,
taskIds,
clientIds,
@ -244,18 +271,112 @@ public final class TaskAssignmentUtils {
);
LOG.info("Assignment before standby task optimization has cost {}", initialCost);
throw new UnsupportedOperationException("Not yet Implemented.");
final MoveStandbyTaskPredicate moveablePredicate = getStandbyTaskMovePredicate(applicationState);
final BiFunction<KafkaStreamsAssignment, KafkaStreamsAssignment, List<TaskId>> getMovableTasks = (source, destination) -> {
return source.tasks().values().stream()
.filter(task -> task.type() == AssignedTask.Type.STANDBY)
.filter(task -> !destination.tasks().containsKey(task.id()))
.filter(task -> {
final KafkaStreamsState sourceState = kafkaStreamsStates.get(source.processId());
final KafkaStreamsState destinationState = kafkaStreamsStates.get(source.processId());
return moveablePredicate.canMoveStandbyTask(sourceState, destinationState, task.id(), kafkaStreamsAssignments);
})
.map(AssignedTask::id)
.sorted()
.collect(Collectors.toList());
};
final long startTime = System.currentTimeMillis();
boolean taskMoved = true;
int round = 0;
final RackAwareGraphConstructor<KafkaStreamsAssignment> graphConstructor = RackAwareGraphConstructorFactory.create(
applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), taskIds);
while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
taskMoved = false;
round++;
for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
final UUID clientId1 = clientIds.get(i);
final KafkaStreamsAssignment clientState1 = kafkaStreamsAssignments.get(new ProcessId(clientId1));
for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
final UUID clientId2 = clientIds.get(i);
final KafkaStreamsAssignment clientState2 = kafkaStreamsAssignments.get(new ProcessId(clientId2));
final String rack1 = clientRacks.get(clientState1.processId().id()).get();
final String rack2 = clientRacks.get(clientState2.processId().id()).get();
// Cross rack traffic can not be reduced if racks are the same
if (rack1.equals(rack2)) {
continue;
}
final List<TaskId> movable1 = getMovableTasks.apply(clientState1, clientState2);
final List<TaskId> movable2 = getMovableTasks.apply(clientState2, clientState1);
// There's no needed to optimize if one is empty because the optimization
// can only swap tasks to keep the client's load balanced
if (movable1.isEmpty() || movable2.isEmpty()) {
continue;
}
final List<TaskId> taskIdList = Stream.concat(movable1.stream(), movable2.stream())
.sorted()
.collect(Collectors.toList());
final List<UUID> clients = Stream.of(clientId1, clientId2).sorted().collect(Collectors.toList());
final AssignmentGraph assignmentGraph = buildTaskGraph(
assignmentsByUuid,
clientRacks,
taskIdList,
clients,
topicPartitionsByTaskId,
crossRackTrafficCost,
nonOverlapCost,
false,
false,
graphConstructor
);
assignmentGraph.graph.solveMinCostFlow();
taskMoved |= graphConstructor.assignTaskFromMinCostFlow(
assignmentGraph.graph,
clientIds,
taskIds,
assignmentsByUuid,
assignmentGraph.taskCountByClient,
assignmentGraph.clientByTask,
(assignment, taskId) -> assignment.assignTask(new AssignedTask(taskId, AssignedTask.Type.STANDBY)),
(assignment, taskId) -> assignment.removeTask(new AssignedTask(taskId, AssignedTask.Type.STANDBY)),
(assignment, taskId) -> assignment.tasks().containsKey(taskId) && assignment.tasks().get(taskId).type() == AssignedTask.Type.STANDBY
);
}
}
}
final long finalCost = computeTotalAssignmentCost(
topicPartitionsByTaskId,
taskIds,
clientIds,
assignmentsByUuid,
clientRacks,
crossRackTrafficCost,
nonOverlapCost,
true,
true
);
final long duration = System.currentTimeMillis() - startTime;
LOG.info("Assignment after {} rounds and {} milliseconds for standby task optimization is {}\n with cost {}",
round, duration, kafkaStreamsAssignments, finalCost);
return kafkaStreamsAssignments;
}
private static long computeInitialCost(final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId,
final List<TaskId> taskIds,
final List<UUID> clientIds,
final Map<UUID, KafkaStreamsAssignment> assignmentsByUuid,
final Map<UUID, Optional<String>> clientRacks,
final int crossRackTrafficCost,
final int nonOverlapCost,
final boolean hasReplica,
final boolean isStandby) {
private static long computeTotalAssignmentCost(final Map<TaskId, Set<TaskTopicPartition>> topicPartitionsByTaskId,
final List<TaskId> taskIds,
final List<UUID> clientIds,
final Map<UUID, KafkaStreamsAssignment> assignmentsByUuid,
final Map<UUID, Optional<String>> clientRacks,
final int crossRackTrafficCost,
final int nonOverlapCost,
final boolean hasReplica,
final boolean isStandby) {
if (taskIds.isEmpty()) {
return 0;
}
@ -296,7 +417,7 @@ public final class TaskAssignmentUtils {
assignmentsByUuid,
clientByTask,
taskCountByClient,
(assignment, taskId) -> assignment.assignedTaskIds().contains(taskId),
(assignment, taskId) -> assignment.tasks().containsKey(taskId),
(taskId, processId, inCurrentAssignment, unused0, unused1, unused2) -> {
final String clientRack = clientRacks.get(processId).get();
final int assignmentChangeCost = !inCurrentAssignment ? nonOverlapCost : 0;
@ -328,6 +449,14 @@ public final class TaskAssignmentUtils {
}
}
@FunctionalInterface
public interface MoveStandbyTaskPredicate {
boolean canMoveStandbyTask(final KafkaStreamsState source,
final KafkaStreamsState destination,
final TaskId taskId,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignment);
}
/**
*
* @return the traffic cost of assigning this {@param task} to the client {@param streamsState}.
@ -354,7 +483,6 @@ public final class TaskAssignmentUtils {
final AssignedTask.Type taskType) {
final String rackAwareAssignmentStrategy = applicationState.assignmentConfigs().rackAwareAssignmentStrategy();
if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(rackAwareAssignmentStrategy)) {
LOG.warn("KafkaStreams rack aware task assignment optimization was disabled in the StreamsConfig.");
return false;
}
return hasValidRackInformation(applicationState, taskType);
@ -407,4 +535,317 @@ public final class TaskAssignmentUtils {
}
return true;
}
private static Map<ProcessId, KafkaStreamsAssignment> tagBasedStandbyTaskAssignment(final ApplicationState applicationState,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas();
final Map<ProcessId, KafkaStreamsState> streamStates = applicationState.kafkaStreamsStates(false);
final Set<String> rackAwareAssignmentTags = new HashSet<>(applicationState.assignmentConfigs().rackAwareAssignmentTags());
final TagStatistics tagStatistics = new TagStatistics(applicationState);
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
final Set<TaskId> statefulTaskIds = applicationState.allTasks().values().stream()
.filter(TaskInfo::isStateful)
.map(TaskInfo::id)
.collect(Collectors.toSet());
final Map<TaskId, Integer> tasksToRemainingStandbys = statefulTaskIds.stream()
.collect(Collectors.toMap(Function.identity(), t -> numStandbyReplicas));
final Map<UUID, KafkaStreamsAssignment> clientsByUuid = kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
entry -> entry.getKey().id(),
Map.Entry::getValue
));
final Map<TaskId, ProcessId> pendingStandbyTasksToClientId = new HashMap<>();
for (final TaskId statefulTaskId : statefulTaskIds) {
for (final KafkaStreamsAssignment assignment : clientsByUuid.values()) {
if (assignment.tasks().containsKey(statefulTaskId)) {
assignStandbyTasksToClientsWithDifferentTags(
numStandbyReplicas,
standbyTaskClientsByTaskLoad,
statefulTaskId,
assignment.processId(),
rackAwareAssignmentTags,
streamStates,
kafkaStreamsAssignments,
tasksToRemainingStandbys,
tagStatistics.tagKeyToValues,
tagStatistics.tagEntryToClients,
pendingStandbyTasksToClientId
);
}
}
}
if (!tasksToRemainingStandbys.isEmpty()) {
assignPendingStandbyTasksToLeastLoadedClients(clientsByUuid,
numStandbyReplicas,
standbyTaskClientsByTaskLoad,
tasksToRemainingStandbys);
}
return kafkaStreamsAssignments;
}
private static Map<ProcessId, KafkaStreamsAssignment> loadBasedStandbyTaskAssignment(final ApplicationState applicationState,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas();
final Map<ProcessId, KafkaStreamsState> streamStates = applicationState.kafkaStreamsStates(false);
final Set<TaskId> statefulTaskIds = applicationState.allTasks().values().stream()
.filter(TaskInfo::isStateful)
.map(TaskInfo::id)
.collect(Collectors.toSet());
final Map<TaskId, Integer> tasksToRemainingStandbys = statefulTaskIds.stream()
.collect(Collectors.toMap(Function.identity(), t -> numStandbyReplicas));
final Map<UUID, KafkaStreamsAssignment> clients = kafkaStreamsAssignments.entrySet().stream().collect(Collectors.toMap(
entry -> entry.getKey().id(),
Map.Entry::getValue
));
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = standbyTaskPriorityListByLoad(streamStates, kafkaStreamsAssignments);
standbyTaskClientsByTaskLoad.offerAll(streamStates.keySet().stream().map(ProcessId::id).collect(Collectors.toSet()));
for (final TaskId task : statefulTaskIds) {
assignStandbyTasksForActiveTask(
numStandbyReplicas,
clients,
tasksToRemainingStandbys,
standbyTaskClientsByTaskLoad,
task
);
}
return kafkaStreamsAssignments;
}
private static void assignStandbyTasksForActiveTask(final int numStandbyReplicas,
final Map<UUID, KafkaStreamsAssignment> clients,
final Map<TaskId, Integer> tasksToRemainingStandbys,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId) {
int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
while (numRemainingStandbys > 0) {
final UUID client = standbyTaskClientsByTaskLoad.poll(activeTaskId);
if (client == null) {
break;
}
clients.get(client).assignTask(new AssignedTask(activeTaskId, AssignedTask.Type.STANDBY));
numRemainingStandbys--;
standbyTaskClientsByTaskLoad.offer(client);
}
tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
if (numRemainingStandbys > 0) {
LOG.warn("Unable to assign {} of {} standby tasks for task [{}]. " +
"There is not enough available capacity. You should " +
"increase the number of application instances " +
"to maintain the requested number of standby replicas.",
numRemainingStandbys, numStandbyReplicas, activeTaskId);
}
}
private static void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStandbyClients,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId,
final ProcessId activeTaskClient,
final Set<String> rackAwareAssignmentTags,
final Map<ProcessId, KafkaStreamsState> clientStates,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
final Map<TaskId, Integer> tasksToRemainingStandbys,
final Map<String, Set<String>> tagKeyToValues,
final Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients,
final Map<TaskId, ProcessId> pendingStandbyTasksToClientId) {
standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet().stream()
.map(ProcessId::id).collect(Collectors.toSet()));
// We set countOfUsedClients as 1 because client where active task is located has to be considered as used.
int countOfUsedClients = 1;
int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
final Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToUsedClients = new HashMap<>();
ProcessId lastUsedClient = activeTaskClient;
do {
updateClientsOnAlreadyUsedTagEntries(
clientStates.get(lastUsedClient),
countOfUsedClients,
rackAwareAssignmentTags,
tagEntryToClients,
tagKeyToValues,
tagEntryToUsedClients
);
final UUID clientOnUnusedTagDimensions = standbyTaskClientsByTaskLoad.poll(
activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(new ProcessId(uuid), tagEntryToUsedClients)
);
if (clientOnUnusedTagDimensions == null) {
break;
}
final KafkaStreamsState clientStateOnUsedTagDimensions = clientStates.get(new ProcessId(clientOnUnusedTagDimensions));
countOfUsedClients++;
numRemainingStandbys--;
LOG.debug("Assigning {} out of {} standby tasks for an active task [{}] with client tags {}. " +
"Standby task client tags are {}.",
numberOfStandbyClients - numRemainingStandbys, numberOfStandbyClients, activeTaskId,
clientStates.get(activeTaskClient).clientTags(),
clientStateOnUsedTagDimensions.clientTags());
kafkaStreamsAssignments.get(clientStateOnUsedTagDimensions.processId()).assignTask(
new AssignedTask(activeTaskId, AssignedTask.Type.STANDBY)
);
lastUsedClient = new ProcessId(clientOnUnusedTagDimensions);
} while (numRemainingStandbys > 0);
if (numRemainingStandbys > 0) {
pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient);
tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
LOG.warn("Rack aware standby task assignment was not able to assign {} of {} standby tasks for the " +
"active task [{}] with the rack aware assignment tags {}. " +
"This may happen when there aren't enough application instances on different tag " +
"dimensions compared to an active and corresponding standby task. " +
"Consider launching application instances on different tag dimensions than [{}]. " +
"Standby task assignment will fall back to assigning standby tasks to the least loaded clients.",
numRemainingStandbys, numberOfStandbyClients,
activeTaskId, rackAwareAssignmentTags,
clientStates.get(activeTaskClient).clientTags());
} else {
tasksToRemainingStandbys.remove(activeTaskId);
}
}
private static boolean isClientUsedOnAnyOfTheTagEntries(final ProcessId client,
final Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToUsedClients) {
return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> usedClients.contains(client));
}
private static void updateClientsOnAlreadyUsedTagEntries(final KafkaStreamsState usedClient,
final int countOfUsedClients,
final Set<String> rackAwareAssignmentTags,
final Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients,
final Map<String, Set<String>> tagKeyToValues,
final Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToUsedClients) {
final Map<String, String> usedClientTags = usedClient.clientTags();
for (final Map.Entry<String, String> usedClientTagEntry : usedClientTags.entrySet()) {
final String tagKey = usedClientTagEntry.getKey();
if (!rackAwareAssignmentTags.contains(tagKey)) {
LOG.warn("Client tag with key [{}] will be ignored when computing rack aware standby " +
"task assignment because it is not part of the configured rack awareness [{}].",
tagKey, rackAwareAssignmentTags);
continue;
}
final Set<String> allTagValues = tagKeyToValues.get(tagKey);
if (allTagValues.size() <= countOfUsedClients) {
allTagValues.forEach(tagValue -> tagEntryToUsedClients.remove(new KeyValue<>(tagKey, tagValue)));
} else {
final String tagValue = usedClientTagEntry.getValue();
final KeyValue<String, String> tagEntry = new KeyValue<>(tagKey, tagValue);
final Set<ProcessId> clientsOnUsedTagValue = tagEntryToClients.get(tagEntry);
tagEntryToUsedClients.put(tagEntry, clientsOnUsedTagValue);
}
}
}
private static MoveStandbyTaskPredicate getStandbyTaskMovePredicate(final ApplicationState applicationState) {
final boolean hasRackAwareAssignmentTags = !applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty();
if (hasRackAwareAssignmentTags) {
final BiConsumer<KafkaStreamsState, Set<KeyValue<String, String>>> addTags = (cs, tagSet) -> {
final Map<String, String> tags = cs.clientTags();
if (tags != null) {
tagSet.addAll(tags.entrySet().stream()
.map(entry -> KeyValue.pair(entry.getKey(), entry.getValue()))
.collect(Collectors.toList())
);
}
};
final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false);
return (source, destination, sourceTask, kafkaStreamsAssignments) -> {
final Set<KeyValue<String, String>> tagsWithSource = new HashSet<>();
final Set<KeyValue<String, String>> tagsWithDestination = new HashSet<>();
for (final KafkaStreamsAssignment assignment: kafkaStreamsAssignments.values()) {
final boolean hasAssignedTask = assignment.tasks().containsKey(sourceTask);
final boolean isSourceProcess = assignment.processId().equals(source.processId());
final boolean isDestinationProcess = assignment.processId().equals(destination.processId());
if (hasAssignedTask && !isSourceProcess && !isDestinationProcess) {
final KafkaStreamsState clientState = clients.get(assignment.processId());
addTags.accept(clientState, tagsWithSource);
addTags.accept(clientState, tagsWithDestination);
}
}
addTags.accept(source, tagsWithSource);
addTags.accept(destination, tagsWithDestination);
return tagsWithDestination.size() >= tagsWithSource.size();
};
} else {
return (a, b, c, d) -> true;
}
}
private static ConstrainedPrioritySet standbyTaskPriorityListByLoad(final Map<ProcessId, KafkaStreamsState> streamStates,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
return new ConstrainedPrioritySet(
(processId, taskId) -> kafkaStreamsAssignments.get(new ProcessId(processId)).tasks().containsKey(taskId),
processId -> {
final double capacity = streamStates.get(new ProcessId(processId)).numProcessingThreads();
final double numTasks = kafkaStreamsAssignments.get(new ProcessId(processId)).tasks().size();
return numTasks / capacity;
}
);
}
private static void assignPendingStandbyTasksToLeastLoadedClients(final Map<UUID, KafkaStreamsAssignment> clients,
final int numStandbyReplicas,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final Map<TaskId, Integer> pendingStandbyTaskToNumberRemainingStandbys) {
// We need to re offer all the clients to find the least loaded ones
standbyTaskClientsByTaskLoad.offerAll(clients.keySet());
for (final Map.Entry<TaskId, Integer> pendingStandbyTaskAssignmentEntry : pendingStandbyTaskToNumberRemainingStandbys.entrySet()) {
final TaskId activeTaskId = pendingStandbyTaskAssignmentEntry.getKey();
assignStandbyTasksForActiveTask(
numStandbyReplicas,
clients,
pendingStandbyTaskToNumberRemainingStandbys,
standbyTaskClientsByTaskLoad,
activeTaskId
);
}
}
private static class TagStatistics {
private final Map<String, Set<String>> tagKeyToValues;
private final Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients;
private TagStatistics(final Map<String, Set<String>> tagKeyToValues,
final Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients) {
this.tagKeyToValues = tagKeyToValues;
this.tagEntryToClients = tagEntryToClients;
}
public TagStatistics(final ApplicationState applicationState) {
final Map<ProcessId, KafkaStreamsState> clientStates = applicationState.kafkaStreamsStates(false);
final Map<String, Set<String>> tagKeyToValues = new HashMap<>();
final Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients = new HashMap<>();
for (final KafkaStreamsState state : clientStates.values()) {
state.clientTags().forEach((tagKey, tagValue) -> {
tagKeyToValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue);
tagEntryToClients.computeIfAbsent(new KeyValue<>(tagKey, tagValue), ignored -> new HashSet<>()).add(state.processId());
});
}
this.tagKeyToValues = tagKeyToValues;
this.tagEntryToClients = tagEntryToClients;
}
}
}

View File

@ -278,7 +278,7 @@ public class StickyTaskAssignor implements TaskAssignor {
for (final Map.Entry<ProcessId, KafkaStreamsAssignment> entry : optimizedAssignments.entrySet()) {
final ProcessId processId = entry.getKey();
final Set<AssignedTask> assignedTasks = optimizedAssignments.get(processId).assignment();
final Set<AssignedTask> assignedTasks = new HashSet<>(optimizedAssignments.get(processId).tasks().values());
newAssignments.put(processId, assignedTasks);
for (final AssignedTask task : assignedTasks) {

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import java.time.Instant;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
@ -555,18 +556,21 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
RackUtils.annotateTopicPartitionsWithRackInfo(cluster, internalTopicManager, allTopicPartitions);
final Set<TaskInfo> logicalTasks = logicalTaskIds.stream().map(taskId -> {
final Set<String> stateStoreNames = topologyMetadata
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
.keySet();
final Set<TaskTopicPartition> topicPartitions = topicPartitionsForTask.get(taskId);
return new DefaultTaskInfo(
taskId,
!stateStoreNames.isEmpty(),
stateStoreNames,
topicPartitions
);
}).collect(Collectors.toSet());
final Map<TaskId, TaskInfo> logicalTasks = logicalTaskIds.stream().collect(Collectors.toMap(
Function.identity(),
taskId -> {
final Set<String> stateStoreNames = topologyMetadata
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
.keySet();
final Set<TaskTopicPartition> topicPartitions = topicPartitionsForTask.get(taskId);
return new DefaultTaskInfo(
taskId,
!stateStoreNames.isEmpty(),
stateStoreNames,
topicPartitions
);
}
));
return new DefaultApplicationState(
assignmentConfigs.toPublicAssignmentConfigs(),
@ -728,12 +732,12 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
* populate the stateful tasks that have been assigned to the clients
*/
private UserTaskAssignmentListener assignTasksToClients(final Cluster fullMetadata,
final Set<String> allSourceTopics,
final Map<Subtopology, TopicsInfo> topicGroups,
final Map<UUID, ClientMetadata> clientMetadataMap,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<UUID, Map<String, Optional<String>>> racksForProcessConsumer,
final Set<TaskId> statefulTasks) {
final Set<String> allSourceTopics,
final Map<Subtopology, TopicsInfo> topicGroups,
final Map<UUID, ClientMetadata> clientMetadataMap,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<UUID, Map<String, Optional<String>>> racksForProcessConsumer,
final Set<TaskId> statefulTasks) {
if (!statefulTasks.isEmpty()) {
throw new TaskAssignmentException("The stateful tasks should not be populated before assigning tasks to clients");
}
@ -775,7 +779,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor> userTaskAssignor =
userTaskAssignorSupplier.get();
UserTaskAssignmentListener userTaskAssignmentListener = (GroupAssignment assignment, GroupSubscription subscription) -> { };
final UserTaskAssignmentListener userTaskAssignmentListener;
if (userTaskAssignor.isPresent()) {
final ApplicationState applicationState = buildApplicationState(
taskManager.topologyMetadata(),
@ -785,12 +789,11 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
);
final org.apache.kafka.streams.processor.assignment.TaskAssignor assignor = userTaskAssignor.get();
final TaskAssignment taskAssignment = assignor.assign(applicationState);
processStreamsPartitionAssignment(clientMetadataMap, taskAssignment);
final AssignmentError assignmentError = validateTaskAssignment(applicationState, taskAssignment);
userTaskAssignmentListener = (GroupAssignment assignment, GroupSubscription subscription) -> {
assignor.onAssignmentComputed(assignment, subscription, assignmentError);
};
processStreamsPartitionAssignment(clientMetadataMap, taskAssignment);
userTaskAssignmentListener = (assignment, subscription) -> assignor.onAssignmentComputed(assignment, subscription, assignmentError);
} else {
userTaskAssignmentListener = (assignment, subscription) -> { };
final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);
final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(
fullMetadata,
@ -1564,7 +1567,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final Map<TaskId, ProcessId> standbyTasksInOutput = new HashMap<>();
for (final KafkaStreamsAssignment assignment : assignments) {
final Set<TaskId> tasksForAssignment = new HashSet<>();
for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) {
for (final KafkaStreamsAssignment.AssignedTask task : assignment.tasks().values()) {
if (activeTasksInOutput.containsKey(task.id()) && task.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
log.error("Assignment is invalid: active task {} was assigned to multiple KafkaStreams clients: {} and {}",
task.id(), assignment.processId().id(), activeTasksInOutput.get(task.id()).id());
@ -1614,7 +1617,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final Set<TaskId> taskIdsInInput = applicationState.allTasks().keySet();
for (final KafkaStreamsAssignment assignment : assignments) {
for (final KafkaStreamsAssignment.AssignedTask task : assignment.assignment()) {
for (final KafkaStreamsAssignment.AssignedTask task : assignment.tasks().values()) {
if (!taskIdsInInput.contains(task.id())) {
log.error("Assignment is invalid: task {} assigned to KafkaStreams client {} was unknown",
task.id(), assignment.processId().id());

View File

@ -85,8 +85,8 @@ public class ClientState {
}
ClientState(final UUID processId, final int capacity, final Map<String, String> clientTags) {
previousStandbyTasks.taskIds(new TreeSet<>());
previousActiveTasks.taskIds(new TreeSet<>());
previousStandbyTasks.setTaskIds(new TreeSet<>());
previousActiveTasks.setTaskIds(new TreeSet<>());
taskOffsetSums = new TreeMap<>();
taskLagTotals = new TreeMap<>();
this.capacity = capacity;
@ -110,8 +110,8 @@ public class ClientState {
final Map<String, String> clientTags,
final int capacity,
final UUID processId) {
this.previousStandbyTasks.taskIds(unmodifiableSet(new TreeSet<>(previousStandbyTasks)));
this.previousActiveTasks.taskIds(unmodifiableSet(new TreeSet<>(previousActiveTasks)));
this.previousStandbyTasks.setTaskIds(unmodifiableSet(new TreeSet<>(previousStandbyTasks)));
this.previousActiveTasks.setTaskIds(unmodifiableSet(new TreeSet<>(previousActiveTasks)));
taskOffsetSums = emptyMap();
this.taskLagTotals = unmodifiableMap(taskLagTotals);
this.capacity = capacity;
@ -489,14 +489,14 @@ public class ClientState {
}
public void setAssignedTasks(final KafkaStreamsAssignment assignment) {
final Set<TaskId> activeTasks = assignment.assignment().stream()
final Set<TaskId> activeTasks = assignment.tasks().values().stream()
.filter(task -> task.type() == ACTIVE).map(KafkaStreamsAssignment.AssignedTask::id)
.collect(Collectors.toSet());
final Set<TaskId> standbyTasks = assignment.assignment().stream()
final Set<TaskId> standbyTasks = assignment.tasks().values().stream()
.filter(task -> task.type() == STANDBY).map(KafkaStreamsAssignment.AssignedTask::id)
.collect(Collectors.toSet());
assignedActiveTasks.taskIds(activeTasks);
assignedStandbyTasks.taskIds(standbyTasks);
assignedActiveTasks.setTaskIds(activeTasks);
assignedStandbyTasks.setTaskIds(standbyTasks);
}
public String currentAssignment() {

View File

@ -31,7 +31,7 @@ class ClientStateTask {
this.consumerToTaskIds = consumerToTaskIds;
}
void taskIds(final Set<TaskId> clientToTaskIds) {
void setTaskIds(final Set<TaskId> clientToTaskIds) {
taskIds = clientToTaskIds;
}

View File

@ -30,14 +30,14 @@ import java.util.function.Function;
/**
* Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment
*/
class ConstrainedPrioritySet {
public class ConstrainedPrioritySet {
private final PriorityQueue<UUID> clientsByTaskLoad;
private final BiFunction<UUID, TaskId, Boolean> constraint;
private final Set<UUID> uniqueClients = new HashSet<>();
ConstrainedPrioritySet(final BiFunction<UUID, TaskId, Boolean> constraint,
final Function<UUID, Double> weight) {
public ConstrainedPrioritySet(final BiFunction<UUID, TaskId, Boolean> constraint,
final Function<UUID, Double> weight) {
this.constraint = constraint;
clientsByTaskLoad = new PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> clientId));
}
@ -45,7 +45,7 @@ class ConstrainedPrioritySet {
/**
* @return the next least loaded client that satisfies the given criteria, or null if none do
*/
UUID poll(final TaskId task, final Function<UUID, Boolean> extraConstraint) {
public UUID poll(final TaskId task, final Function<UUID, Boolean> extraConstraint) {
final Set<UUID> invalidPolledClients = new HashSet<>();
while (!clientsByTaskLoad.isEmpty()) {
final UUID candidateClient = pollNextClient();
@ -66,17 +66,17 @@ class ConstrainedPrioritySet {
/**
* @return the next least loaded client that satisfies the given criteria, or null if none do
*/
UUID poll(final TaskId task) {
public UUID poll(final TaskId task) {
return poll(task, client -> true);
}
void offerAll(final Collection<UUID> clients) {
public void offerAll(final Collection<UUID> clients) {
for (final UUID client : clients) {
offer(client);
}
}
void offer(final UUID client) {
public void offer(final UUID client) {
if (uniqueClients.contains(client)) {
clientsByTaskLoad.remove(client);
} else {

View File

@ -21,9 +21,7 @@ import static java.util.Collections.unmodifiableMap;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.ClientMetadata;
import org.apache.kafka.streams.processor.TaskId;
@ -42,10 +40,10 @@ public class DefaultApplicationState implements ApplicationState {
private final Map<Boolean, Map<ProcessId, KafkaStreamsState>> cachedKafkaStreamStates;
public DefaultApplicationState(final AssignmentConfigs assignmentConfigs,
final Set<TaskInfo> tasks,
final Map<TaskId, TaskInfo> tasks,
final Map<UUID, ClientMetadata> clientStates) {
this.assignmentConfigs = assignmentConfigs;
this.tasks = unmodifiableMap(tasks.stream().collect(Collectors.toMap(TaskInfo::id, task -> task)));
this.tasks = unmodifiableMap(tasks);
this.clientStates = clientStates;
this.cachedKafkaStreamStates = new HashMap<>();
}

View File

@ -77,7 +77,7 @@ public class RackAwareTaskAssignor {
// This is number is picked based on testing. Usually the optimization for standby assignment
// stops after 3 rounds
private static final int STANDBY_OPTIMIZER_MAX_ITERATION = 4;
public static final int STANDBY_OPTIMIZER_MAX_ITERATION = 4;
private final Cluster fullMetadata;
private final Map<TaskId, Set<TopicPartition>> partitionsForTask;