KAFKA-6718 / Rack aware standby task assignor (#10851)

This PR is part of KIP-708 and adds rack aware standby task assignment logic.

Reviewer: Bruno Cadonna <cadonna@apache.org>, Luke Chen <showuon@gmail.com>, Vladimir Sitnikov <vladimirsitnikov.apache.org>
This commit is contained in:
Levani Kokhreidze 2022-03-03 05:37:26 +02:00 committed by GitHub
parent 07553d13f7
commit 62e646619b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1448 additions and 122 deletions

View File

@ -29,6 +29,8 @@ import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.getHost;
@ -241,22 +243,26 @@ public final class AssignorConfiguration {
public final int maxWarmupReplicas;
public final int numStandbyReplicas;
public final long probingRebalanceIntervalMs;
public final List<String> rackAwareAssignmentTags;
private AssignmentConfigs(final StreamsConfig configs) {
acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
rackAwareAssignmentTags = Collections.emptyList();
}
AssignmentConfigs(final Long acceptableRecoveryLag,
final Integer maxWarmupReplicas,
final Integer numStandbyReplicas,
final Long probingRebalanceIntervalMs) {
final Long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags) {
this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag);
this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
this.rackAwareAssignmentTags = rackAwareAssignmentTags;
}
private static <T> T validated(final String configKey, final T value) {

View File

@ -23,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
@ -46,6 +47,7 @@ public class ClientState {
private static final Logger LOG = LoggerFactory.getLogger(ClientState.class);
public static final Comparator<TopicPartition> TOPIC_PARTITION_COMPARATOR = comparing(TopicPartition::topic).thenComparing(TopicPartition::partition);
private final Map<String, String> clientTags;
private final Map<TaskId, Long> taskOffsetSums; // contains only stateful tasks we previously owned
private final Map<TaskId, Long> taskLagTotals; // contains lag for all stateful tasks in the app topology
private final Map<TopicPartition, String> ownedPartitions = new TreeMap<>(TOPIC_PARTITION_COMPARATOR);
@ -64,24 +66,30 @@ public class ClientState {
}
ClientState(final int capacity) {
this(capacity, Collections.emptyMap());
}
ClientState(final int capacity, final Map<String, String> clientTags) {
previousStandbyTasks.taskIds(new TreeSet<>());
previousActiveTasks.taskIds(new TreeSet<>());
taskOffsetSums = new TreeMap<>();
taskLagTotals = new TreeMap<>();
this.capacity = capacity;
this.clientTags = unmodifiableMap(clientTags);
}
// For testing only
public ClientState(final Set<TaskId> previousActiveTasks,
final Set<TaskId> previousStandbyTasks,
final Map<TaskId, Long> taskLagTotals,
final Map<String, String> clientTags,
final int capacity) {
this.previousStandbyTasks.taskIds(unmodifiableSet(new TreeSet<>(previousStandbyTasks)));
this.previousActiveTasks.taskIds(unmodifiableSet(new TreeSet<>(previousActiveTasks)));
taskOffsetSums = emptyMap();
this.taskLagTotals = unmodifiableMap(taskLagTotals);
this.capacity = capacity;
this.clientTags = unmodifiableMap(clientTags);
}
int capacity() {
@ -266,6 +274,10 @@ public class ClientState {
return ownedPartitions.get(partition);
}
public Map<String, String> clientTags() {
return clientTags;
}
public void addOwnedPartitions(final Collection<TopicPartition> ownedPartitions, final String consumer) {
for (final TopicPartition tp : ownedPartitions) {
this.ownedPartitions.put(tp, consumer);

View File

@ -0,0 +1,337 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.createLeastLoadedPrioritySetConstrainedByAssignedTask;
import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks;
/**
* Distributes standby tasks over different tag dimensions. Standby task distribution is on a best-effort basis.
* If rack aware standby task assignment is not possible, implementation fall backs to distributing standby tasks on least-loaded clients.
*
* @see DefaultStandbyTaskAssignor
*/
class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
/**
* The algorithm distributes standby tasks for the {@param statefulTaskIds} over different tag dimensions.
* For each stateful task, the number of standby tasks will be assigned based on configured {@link AssignmentConfigs#numStandbyReplicas}.
* Rack aware standby tasks distribution only takes into account tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags}.
* Ideally, all standby tasks for any given stateful task will be located on different tag dimensions to have the best possible distribution.
* However, if the ideal (or partially ideal) distribution is impossible, the algorithm will fall back to the least-loaded clients without taking rack awareness constraints into consideration.
* The least-loaded clients are determined based on the total number of tasks (active and standby tasks) assigned to the client.
*/
@Override
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final AssignorConfiguration.AssignmentConfigs configs) {
final int numStandbyReplicas = configs.numStandbyReplicas;
final Set<String> rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags);
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(
numStandbyReplicas,
allTaskIds
);
final Map<String, Set<String>> tagKeyToValues = new HashMap<>();
final Map<TagEntry, Set<UUID>> tagEntryToClients = new HashMap<>();
fillClientsTagStatistics(clients, tagEntryToClients, tagKeyToValues);
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = createLeastLoadedPrioritySetConstrainedByAssignedTask(clients);
final Map<TaskId, UUID> pendingStandbyTasksToClientId = new HashMap<>();
for (final TaskId statefulTaskId : statefulTaskIds) {
for (final Map.Entry<UUID, ClientState> entry : clients.entrySet()) {
final UUID clientId = entry.getKey();
final ClientState clientState = entry.getValue();
if (clientState.activeTasks().contains(statefulTaskId)) {
assignStandbyTasksToClientsWithDifferentTags(
standbyTaskClientsByTaskLoad,
statefulTaskId,
clientId,
rackAwareAssignmentTags,
clients,
tasksToRemainingStandbys,
tagKeyToValues,
tagEntryToClients,
pendingStandbyTasksToClientId
);
}
}
}
if (!tasksToRemainingStandbys.isEmpty()) {
log.debug("Rack aware standby task assignment was not able to assign all standby tasks. " +
"tasksToRemainingStandbys=[{}], pendingStandbyTasksToClientId=[{}]. " +
"Will distribute the remaining standby tasks to least loaded clients.",
tasksToRemainingStandbys, pendingStandbyTasksToClientId);
assignPendingStandbyTasksToLeastLoadedClients(clients,
numStandbyReplicas,
rackAwareAssignmentTags,
standbyTaskClientsByTaskLoad,
tasksToRemainingStandbys,
pendingStandbyTasksToClientId);
}
// returning false, because standby task assignment will never require a follow-up probing rebalance.
return false;
}
private static void assignPendingStandbyTasksToLeastLoadedClients(final Map<UUID, ClientState> clients,
final int numStandbyReplicas,
final Set<String> rackAwareAssignmentTags,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final Map<TaskId, Integer> pendingStandbyTaskToNumberRemainingStandbys,
final Map<TaskId, UUID> pendingStandbyTaskToClientId) {
// We need to re offer all the clients to find the least loaded ones
standbyTaskClientsByTaskLoad.offerAll(clients.keySet());
for (final Entry<TaskId, Integer> pendingStandbyTaskAssignmentEntry : pendingStandbyTaskToNumberRemainingStandbys.entrySet()) {
final TaskId activeTaskId = pendingStandbyTaskAssignmentEntry.getKey();
final UUID clientId = pendingStandbyTaskToClientId.get(activeTaskId);
final int numberOfRemainingStandbys = pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(
clients,
pendingStandbyTaskToNumberRemainingStandbys,
standbyTaskClientsByTaskLoad,
activeTaskId
);
if (numberOfRemainingStandbys > 0) {
log.warn("Unable to assign {} of {} standby tasks for task [{}] with client tags [{}]. " +
"There is not enough available capacity. You should " +
"increase the number of application instances " +
"on different client tag dimensions " +
"to maintain the requested number of standby replicas. " +
"Rack awareness is configured with [{}] tags.",
numberOfRemainingStandbys, numStandbyReplicas, activeTaskId,
clients.get(clientId).clientTags(), rackAwareAssignmentTags);
}
}
}
@Override
public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) {
final Map<String, String> sourceClientTags = source.clientTags();
final Map<String, String> destinationClientTags = destination.clientTags();
for (final Entry<String, String> sourceClientTagEntry : sourceClientTags.entrySet()) {
if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey()))) {
return false;
}
}
return true;
}
// Visible for testing
static void fillClientsTagStatistics(final Map<UUID, ClientState> clientStates,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<String, Set<String>> tagKeyToValues) {
for (final Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) {
final UUID clientId = clientStateEntry.getKey();
final ClientState clientState = clientStateEntry.getValue();
clientState.clientTags().forEach((tagKey, tagValue) -> {
tagKeyToValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue);
tagEntryToClients.computeIfAbsent(new TagEntry(tagKey, tagValue), ignored -> new HashSet<>()).add(clientId);
});
}
}
// Visible for testing
static void assignStandbyTasksToClientsWithDifferentTags(final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId,
final UUID activeTaskClient,
final Set<String> rackAwareAssignmentTags,
final Map<UUID, ClientState> clientStates,
final Map<TaskId, Integer> tasksToRemainingStandbys,
final Map<String, Set<String>> tagKeyToValues,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<TaskId, UUID> pendingStandbyTasksToClientId) {
standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
// We set countOfUsedClients as 1 because client where active task is located has to be considered as used.
int countOfUsedClients = 1;
int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
final Map<TagEntry, Set<UUID>> tagEntryToUsedClients = new HashMap<>();
UUID lastUsedClient = activeTaskClient;
do {
updateClientsOnAlreadyUsedTagEntries(
lastUsedClient,
countOfUsedClients,
rackAwareAssignmentTags,
clientStates,
tagEntryToClients,
tagKeyToValues,
tagEntryToUsedClients
);
final UUID clientOnUnusedTagDimensions = standbyTaskClientsByTaskLoad.poll(
activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(uuid, tagEntryToUsedClients)
);
if (clientOnUnusedTagDimensions == null) {
break;
}
clientStates.get(clientOnUnusedTagDimensions).assignStandby(activeTaskId);
countOfUsedClients++;
numRemainingStandbys--;
lastUsedClient = clientOnUnusedTagDimensions;
} while (numRemainingStandbys > 0);
if (numRemainingStandbys > 0) {
pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient);
tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
} else {
tasksToRemainingStandbys.remove(activeTaskId);
}
}
private static boolean isClientUsedOnAnyOfTheTagEntries(final UUID client,
final Map<TagEntry, Set<UUID>> tagEntryToUsedClients) {
return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> usedClients.contains(client));
}
private static void updateClientsOnAlreadyUsedTagEntries(final UUID usedClient,
final int countOfUsedClients,
final Set<String> rackAwareAssignmentTags,
final Map<UUID, ClientState> clientStates,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<String, Set<String>> tagKeyToValues,
final Map<TagEntry, Set<UUID>> tagEntryToUsedClients) {
final Map<String, String> usedClientTags = clientStates.get(usedClient).clientTags();
for (final Entry<String, String> usedClientTagEntry : usedClientTags.entrySet()) {
final String tagKey = usedClientTagEntry.getKey();
if (!rackAwareAssignmentTags.contains(tagKey)) {
log.warn("Client tag with key [{}] will be ignored when computing rack aware standby " +
"task assignment because it is not part of the configured rack awareness [{}].",
tagKey, rackAwareAssignmentTags);
continue;
}
final Set<String> allTagValues = tagKeyToValues.get(tagKey);
// Consider the following client setup where we need to distribute 2 standby tasks for each stateful task.
//
// # Kafka Streams Client 1
// client.tag.zone: eu-central-1a
// client.tag.cluster: k8s-cluster1
// rack.aware.assignment.tags: zone,cluster
//
// # Kafka Streams Client 2
// client.tag.zone: eu-central-1b
// client.tag.cluster: k8s-cluster1
// rack.aware.assignment.tags: zone,cluster
//
// # Kafka Streams Client 3
// client.tag.zone: eu-central-1c
// client.tag.cluster: k8s-cluster1
// rack.aware.assignment.tags: zone,cluster
//
// # Kafka Streams Client 4
// client.tag.zone: eu-central-1a
// client.tag.cluster: k8s-cluster2
// rack.aware.assignment.tags: zone,cluster
//
// # Kafka Streams Client 5
// client.tag.zone: eu-central-1b
// client.tag.cluster: k8s-cluster2
// rack.aware.assignment.tags: zone,cluster
//
// # Kafka Streams Client 6
// client.tag.zone: eu-central-1c
// client.tag.cluster: k8s-cluster2
// rack.aware.assignment.tags: zone,cluster
//
// Since we have only two unique `cluster` tag values,
// we can only achieve "ideal" distribution on the 1st standby task assignment.
// Ideal distribution for the 1st standby task can be achieved because we can assign standby task
// to the client located on different cluster and zone compared to an active task.
// We can't consider the `cluster` tag for the 2nd standby task assignment because the 1st standby
// task would already be assigned on different cluster compared to the active one, which means
// we have already used all the available cluster tag values. Taking the `cluster` tag into consideration
// for the 2nd standby task assignment would effectively mean excluding all the clients.
// Instead, for the 2nd standby task, we can only achieve partial rack awareness based on the `zone` tag.
// As we don't consider the `cluster` tag for the 2nd standby task assignment, partial rack awareness
// can be satisfied by placing the 2nd standby client on a different `zone` tag compared to active and corresponding standby tasks.
// The `zone` on either `cluster` tags are valid candidates for the partial rack awareness, as our goal is to distribute clients on the different `zone` tags.
// This statement checks if we have used more clients than the number of unique values for the given tag,
// and if so, removes those tag entries from the tagEntryToUsedClients map.
if (allTagValues.size() <= countOfUsedClients) {
allTagValues.forEach(tagValue -> tagEntryToUsedClients.remove(new TagEntry(tagKey, tagValue)));
} else {
final String tagValue = usedClientTagEntry.getValue();
final TagEntry tagEntry = new TagEntry(tagKey, tagValue);
final Set<UUID> clientsOnUsedTagValue = tagEntryToClients.get(tagEntry);
tagEntryToUsedClients.put(tagEntry, clientsOnUsedTagValue);
}
}
}
// Visible for testing
static final class TagEntry {
private final String tagKey;
private final String tagValue;
TagEntry(final String tagKey, final String tagValue) {
this.tagKey = tagKey;
this.tagValue = tagValue;
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final TagEntry that = (TagEntry) o;
return Objects.equals(tagKey, that.tagKey) && Objects.equals(tagValue, that.tagValue);
}
@Override
public int hashCode() {
return Objects.hash(tagKey, tagValue);
}
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.createLeastLoadedPrioritySetConstrainedByAssignedTask;
import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks;
/**
* Default standby task assignor that distributes standby tasks to the least loaded clients.
*
* @see ClientTagAwareStandbyTaskAssignor
*/
class DefaultStandbyTaskAssignor implements StandbyTaskAssignor {
private static final Logger log = LoggerFactory.getLogger(DefaultStandbyTaskAssignor.class);
@Override
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final AssignorConfiguration.AssignmentConfigs configs) {
final int numStandbyReplicas = configs.numStandbyReplicas;
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas,
statefulTaskIds);
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = createLeastLoadedPrioritySetConstrainedByAssignedTask(clients);
standbyTaskClientsByTaskLoad.offerAll(clients.keySet());
for (final TaskId task : statefulTaskIds) {
final int numRemainingStandbys = pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(clients,
tasksToRemainingStandbys,
standbyTaskClientsByTaskLoad,
task);
if (numRemainingStandbys > 0) {
log.warn("Unable to assign {} of {} standby tasks for task [{}]. " +
"There is not enough available capacity. You should " +
"increase the number of application instances " +
"to maintain the requested number of standby replicas.",
numRemainingStandbys, numStandbyReplicas, task);
}
}
// returning false, because standby task assignment will never require a follow-up probing rebalance.
return false;
}
}

View File

@ -33,8 +33,8 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.diff;
import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignActiveTaskMovements;
@ -55,8 +55,9 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
assignStandbyReplicaTasks(
clientStates,
allTaskIds,
statefulTasks,
configs.numStandbyReplicas
configs
);
final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);
@ -93,10 +94,10 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
final boolean probingRebalanceNeeded = neededActiveTaskMovements + neededStandbyTaskMovements > 0;
log.info("Decided on assignment: " +
clientStates +
" with" +
(probingRebalanceNeeded ? "" : " no") +
" followup probing rebalance.");
clientStates +
" with" +
(probingRebalanceNeeded ? "" : " no") +
" followup probing rebalance.");
return probingRebalanceNeeded;
}
@ -115,55 +116,46 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
clientStates,
ClientState::activeTasks,
ClientState::unassignActive,
ClientState::assignActive
ClientState::assignActive,
(source, destination) -> true
);
}
private static void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientStates,
final Set<TaskId> statefulTasks,
final int numStandbyReplicas) {
final Map<TaskId, Integer> tasksToRemainingStandbys =
statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> numStandbyReplicas));
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = new ConstrainedPrioritySet(
(client, task) -> !clientStates.get(client).hasAssignedTask(task),
client -> clientStates.get(client).assignedTaskLoad()
);
standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
for (final TaskId task : statefulTasks) {
int numRemainingStandbys = tasksToRemainingStandbys.get(task);
while (numRemainingStandbys > 0) {
final UUID client = standbyTaskClientsByTaskLoad.poll(task);
if (client == null) {
break;
}
clientStates.get(client).assignStandby(task);
numRemainingStandbys--;
standbyTaskClientsByTaskLoad.offer(client);
}
if (numRemainingStandbys > 0) {
log.warn("Unable to assign {} of {} standby tasks for task [{}]. " +
"There is not enough available capacity. You should " +
"increase the number of application instances " +
"to maintain the requested number of standby replicas.",
numRemainingStandbys, numStandbyReplicas, task);
}
private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientStates,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTasks,
final AssignmentConfigs configs) {
if (configs.numStandbyReplicas == 0) {
return;
}
final StandbyTaskAssignor standbyTaskAssignor = createStandbyTaskAssignor(configs);
standbyTaskAssignor.assign(clientStates, allTaskIds, statefulTasks, configs);
balanceTasksOverThreads(
clientStates,
ClientState::standbyTasks,
ClientState::unassignStandby,
ClientState::assignStandby
ClientState::assignStandby,
standbyTaskAssignor::isAllowedTaskMovement
);
}
// Visible for testing
static StandbyTaskAssignor createStandbyTaskAssignor(final AssignmentConfigs configs) {
if (!configs.rackAwareAssignmentTags.isEmpty()) {
return new ClientTagAwareStandbyTaskAssignor();
} else {
return new DefaultStandbyTaskAssignor();
}
}
private static void balanceTasksOverThreads(final SortedMap<UUID, ClientState> clientStates,
final Function<ClientState, Set<TaskId>> currentAssignmentAccessor,
final BiConsumer<ClientState, TaskId> taskUnassignor,
final BiConsumer<ClientState, TaskId> taskAssignor) {
final BiConsumer<ClientState, TaskId> taskAssignor,
final BiPredicate<ClientState, ClientState> taskMovementAttemptPredicate) {
boolean keepBalancing = true;
while (keepBalancing) {
keepBalancing = false;
@ -182,7 +174,10 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
final Iterator<TaskId> sourceIterator = sourceTasks.iterator();
while (shouldMoveATask(sourceClientState, destinationClientState) && sourceIterator.hasNext()) {
final TaskId taskToMove = sourceIterator.next();
final boolean canMove = !destinationClientState.hasAssignedTask(taskToMove);
final boolean canMove = !destinationClientState.hasAssignedTask(taskToMove)
// When ClientTagAwareStandbyTaskAssignor is used, we need to make sure that
// sourceClient tags matches destinationClient tags.
&& taskMovementAttemptPredicate.test(sourceClientState, destinationClientState);
if (canMove) {
taskUnassignor.accept(sourceClientState, taskToMove);
taskAssignor.accept(destinationClientState, taskToMove);

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import static java.util.stream.Collectors.toMap;
final class StandbyTaskAssignmentUtils {
private StandbyTaskAssignmentUtils() {}
static ConstrainedPrioritySet createLeastLoadedPrioritySetConstrainedByAssignedTask(final Map<UUID, ClientState> clients) {
return new ConstrainedPrioritySet((client, t) -> !clients.get(client).hasAssignedTask(t),
client -> clients.get(client).assignedTaskLoad());
}
static int pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(final Map<UUID, ClientState> clients,
final Map<TaskId, Integer> tasksToRemainingStandbys,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId) {
int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
while (numRemainingStandbys > 0) {
final UUID client = standbyTaskClientsByTaskLoad.poll(activeTaskId);
if (client == null) {
break;
}
clients.get(client).assignStandby(activeTaskId);
numRemainingStandbys--;
standbyTaskClientsByTaskLoad.offer(client);
tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
}
return numRemainingStandbys;
}
static Map<TaskId, Integer> computeTasksToRemainingStandbys(final int numStandbyReplicas,
final Set<TaskId> statefulTaskIds) {
return statefulTaskIds.stream().collect(toMap(Function.identity(), t -> numStandbyReplicas));
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
interface StandbyTaskAssignor extends TaskAssignor {
default boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) {
return true;
}
}

View File

@ -17,6 +17,8 @@
package org.apache.kafka.streams.processor.internals.assignment;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@ -102,6 +104,8 @@ public final class AssignmentTestUtils {
public static final Set<TaskId> EMPTY_TASKS = emptySet();
public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
public static final List<String> EMPTY_RACK_AWARE_ASSIGNMENT_TAGS = Collections.emptyList();
public static final Map<String, String> EMPTY_CLIENT_TAGS = Collections.emptyMap();
private AssignmentTestUtils() {}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.common.config.ConfigException;
import org.junit.Test;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertThrows;
@ -29,7 +30,7 @@ public class AssignorConfigurationTest {
public void configsShouldRejectZeroWarmups() {
final ConfigException exception = assertThrows(
ConfigException.class,
() -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, 1L)
() -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(exception.getMessage(), containsString("Invalid value 0 for configuration max.warmup.replicas"));

View File

@ -33,6 +33,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T0_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NAMED_TASK_T1_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
@ -52,6 +53,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@ -66,6 +68,7 @@ public class ClientStateTest {
mkSet(TASK_0_0, TASK_0_1),
mkSet(TASK_0_2, TASK_0_3),
mkMap(mkEntry(TASK_0_0, 5L), mkEntry(TASK_0_2, -1L)),
EMPTY_CLIENT_TAGS,
4
);
@ -531,4 +534,15 @@ public class ClientStateTest {
assertThrows(IllegalStateException.class, () -> client.assignActiveToConsumer(TASK_0_0, "c1"));
}
@Test
public void shouldReturnClientTags() {
final Map<String, String> clientTags = mkMap(mkEntry("k1", "v1"));
assertEquals(clientTags, new ClientState(0, clientTags).clientTags());
}
@Test
public void shouldReturnEmptyClientTagsMapByDefault() {
assertTrue(new ClientState().clientTags().isEmpty());
}
}

View File

@ -0,0 +1,641 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.assignment.ClientTagAwareStandbyTaskAssignor.TagEntry;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
import static org.apache.kafka.streams.processor.internals.assignment.ClientTagAwareStandbyTaskAssignor.assignStandbyTasksToClientsWithDifferentTags;
import static org.apache.kafka.streams.processor.internals.assignment.ClientTagAwareStandbyTaskAssignor.fillClientsTagStatistics;
import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.createLeastLoadedPrioritySetConstrainedByAssignedTask;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ClientTagAwareStandbyTaskAssignorTest {
private static final String ZONE_TAG = "zone";
private static final String CLUSTER_TAG = "cluster";
private static final String ZONE_1 = "zone1";
private static final String ZONE_2 = "zone2";
private static final String ZONE_3 = "zone3";
private static final String CLUSTER_1 = "cluster1";
private static final String CLUSTER_2 = "cluster2";
private static final String CLUSTER_3 = "cluster3";
private static final UUID UUID_1 = uuidForInt(1);
private static final UUID UUID_2 = uuidForInt(2);
private static final UUID UUID_3 = uuidForInt(3);
private static final UUID UUID_4 = uuidForInt(4);
private static final UUID UUID_5 = uuidForInt(5);
private static final UUID UUID_6 = uuidForInt(6);
private static final UUID UUID_7 = uuidForInt(7);
private static final UUID UUID_8 = uuidForInt(8);
private static final UUID UUID_9 = uuidForInt(9);
private StandbyTaskAssignor standbyTaskAssignor;
@Before
public void setup() {
standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
}
@Test
public void shouldRemoveClientToRemainingStandbysAndNotPopulatePendingStandbyTasksToClientIdWhenAllStandbyTasksWereAssigned() {
final Set<String> rackAwareAssignmentTags = mkSet(ZONE_TAG, CLUSTER_TAG);
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_0)),
mkEntry(UUID_2, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_2)), TASK_0_1)),
mkEntry(UUID_3, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_3)), TASK_0_2))
);
final ConstrainedPrioritySet constrainedPrioritySet = createLeastLoadedPrioritySetConstrainedByAssignedTask(clientStates);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final Map<TaskId, UUID> taskToClientId = mkMap(mkEntry(TASK_0_0, UUID_1),
mkEntry(TASK_0_1, UUID_2),
mkEntry(TASK_0_2, UUID_3));
final Map<String, Set<String>> tagKeyToValues = new HashMap<>();
final Map<TagEntry, Set<UUID>> tagEntryToClients = new HashMap<>();
fillClientsTagStatistics(clientStates, tagEntryToClients, tagKeyToValues);
final Map<TaskId, UUID> pendingStandbyTasksToClientId = new HashMap<>();
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(2, allActiveTasks);
for (final TaskId activeTaskId : allActiveTasks) {
assignStandbyTasksToClientsWithDifferentTags(
constrainedPrioritySet,
activeTaskId,
taskToClientId.get(activeTaskId),
rackAwareAssignmentTags,
clientStates,
tasksToRemainingStandbys,
tagKeyToValues,
tagEntryToClients,
pendingStandbyTasksToClientId
);
}
assertTrue(tasksToRemainingStandbys.isEmpty());
assertTrue(pendingStandbyTasksToClientId.isEmpty());
}
@Test
public void shouldUpdateClientToRemainingStandbysAndPendingStandbyTasksToClientIdWhenNotAllStandbyTasksWereAssigned() {
final Set<String> rackAwareAssignmentTags = mkSet(ZONE_TAG, CLUSTER_TAG);
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_0)),
mkEntry(UUID_2, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_2)), TASK_0_1)),
mkEntry(UUID_3, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_3)), TASK_0_2))
);
final ConstrainedPrioritySet constrainedPrioritySet = createLeastLoadedPrioritySetConstrainedByAssignedTask(clientStates);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final Map<TaskId, UUID> taskToClientId = mkMap(mkEntry(TASK_0_0, UUID_1),
mkEntry(TASK_0_1, UUID_2),
mkEntry(TASK_0_2, UUID_3));
final Map<String, Set<String>> tagKeyToValues = new HashMap<>();
final Map<TagEntry, Set<UUID>> tagEntryToClients = new HashMap<>();
fillClientsTagStatistics(clientStates, tagEntryToClients, tagKeyToValues);
final Map<TaskId, UUID> pendingStandbyTasksToClientId = new HashMap<>();
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(3, allActiveTasks);
for (final TaskId activeTaskId : allActiveTasks) {
assignStandbyTasksToClientsWithDifferentTags(
constrainedPrioritySet,
activeTaskId,
taskToClientId.get(activeTaskId),
rackAwareAssignmentTags,
clientStates,
tasksToRemainingStandbys,
tagKeyToValues,
tagEntryToClients,
pendingStandbyTasksToClientId
);
}
allActiveTasks.forEach(
activeTaskId -> assertEquals(String.format("Active task with id [%s] didn't match expected number " +
"of remaining standbys value.", activeTaskId),
1,
tasksToRemainingStandbys.get(activeTaskId).longValue())
);
allActiveTasks.forEach(
activeTaskId -> assertEquals(String.format("Active task with id [%s] didn't match expected " +
"client ID value.", activeTaskId),
taskToClientId.get(activeTaskId),
pendingStandbyTasksToClientId.get(activeTaskId))
);
}
@Test
public void shouldPermitTaskMovementWhenClientTagsMatch() {
final ClientState source = createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)));
final ClientState destination = createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)));
assertTrue(standbyTaskAssignor.isAllowedTaskMovement(source, destination));
}
@Test
public void shouldDeclineTaskMovementWhenClientTagsDoNotMatch() {
final ClientState source = createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)));
final ClientState destination = createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_1)));
assertFalse(standbyTaskAssignor.isAllowedTaskMovement(source, destination));
}
@Test
public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameZone() {
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_0, TASK_1_0)),
mkEntry(UUID_2, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_1)))),
mkEntry(UUID_3, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_1)))),
mkEntry(UUID_4, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_2)), TASK_0_1, TASK_1_1)),
mkEntry(UUID_5, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_2)))),
mkEntry(UUID_6, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_2)))),
mkEntry(UUID_7, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_3)), TASK_0_2, TASK_1_2)),
mkEntry(UUID_8, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_3)))),
mkEntry(UUID_9, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_3))))
);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2, ZONE_TAG, CLUSTER_TAG);
standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs);
assertTrue(clientStates.values().stream().allMatch(ClientState::reachedCapacity));
Stream.of(UUID_1, UUID_4, UUID_7).forEach(client -> assertStandbyTaskCountForClientEqualsTo(clientStates, client, 0));
Stream.of(UUID_2, UUID_3, UUID_5, UUID_6, UUID_8, UUID_9).forEach(client -> assertStandbyTaskCountForClientEqualsTo(clientStates, client, 2));
assertTotalNumberOfStandbyTasksEqualsTo(clientStates, 12);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_0,
clientStates,
asList(
mkSet(UUID_9, UUID_5), mkSet(UUID_6, UUID_8)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_1_0,
clientStates,
asList(
mkSet(UUID_9, UUID_5), mkSet(UUID_6, UUID_8)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_1,
clientStates,
asList(
mkSet(UUID_2, UUID_9), mkSet(UUID_3, UUID_8)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_1_1,
clientStates,
asList(
mkSet(UUID_2, UUID_9), mkSet(UUID_3, UUID_8)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_2,
clientStates,
asList(
mkSet(UUID_5, UUID_3), mkSet(UUID_2, UUID_6)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_1_2,
clientStates,
asList(
mkSet(UUID_5, UUID_3), mkSet(UUID_2, UUID_6)
)
)
);
}
@Test
public void shouldDistributeStandbyTasksWhenActiveTasksAreLocatedOnSameCluster() {
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_0, TASK_1_0)),
mkEntry(UUID_2, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_1, TASK_1_1)),
mkEntry(UUID_3, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_2, TASK_1_2)),
mkEntry(UUID_4, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_2)))),
mkEntry(UUID_5, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_2)))),
mkEntry(UUID_6, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_2)))),
mkEntry(UUID_7, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_3)))),
mkEntry(UUID_8, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_3)))),
mkEntry(UUID_9, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_3))))
);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2, ZONE_TAG, CLUSTER_TAG);
standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs);
assertTrue(clientStates.values().stream().allMatch(ClientState::reachedCapacity));
Stream.of(UUID_1, UUID_2, UUID_3).forEach(client -> assertStandbyTaskCountForClientEqualsTo(clientStates, client, 0));
Stream.of(UUID_4, UUID_5, UUID_6, UUID_7, UUID_8, UUID_9).forEach(client -> assertStandbyTaskCountForClientEqualsTo(clientStates, client, 2));
assertTotalNumberOfStandbyTasksEqualsTo(clientStates, 12);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_0,
clientStates,
asList(
mkSet(UUID_9, UUID_5), mkSet(UUID_6, UUID_8)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_1_0,
clientStates,
asList(
mkSet(UUID_9, UUID_5), mkSet(UUID_6, UUID_8)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_1,
clientStates,
asList(
mkSet(UUID_4, UUID_9), mkSet(UUID_6, UUID_7)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_1_1,
clientStates,
asList(
mkSet(UUID_4, UUID_9), mkSet(UUID_6, UUID_7)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_2,
clientStates,
asList(
mkSet(UUID_5, UUID_7), mkSet(UUID_4, UUID_8)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_1_2,
clientStates,
asList(
mkSet(UUID_5, UUID_7), mkSet(UUID_4, UUID_8)
)
)
);
}
@Test
public void shouldDoThePartialRackAwareness() {
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(1, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_1), mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0)),
mkEntry(UUID_2, createClientStateWithCapacity(1, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_1), mkEntry(ZONE_TAG, ZONE_2)))),
mkEntry(UUID_3, createClientStateWithCapacity(1, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_1), mkEntry(ZONE_TAG, ZONE_3)))),
mkEntry(UUID_4, createClientStateWithCapacity(1, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_2), mkEntry(ZONE_TAG, ZONE_1)))),
mkEntry(UUID_5, createClientStateWithCapacity(1, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_2), mkEntry(ZONE_TAG, ZONE_2)))),
mkEntry(UUID_6, createClientStateWithCapacity(1, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_2), mkEntry(ZONE_TAG, ZONE_3)), TASK_1_0))
);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2, CLUSTER_TAG, ZONE_TAG);
standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs);
// We need to distribute 2 standby tasks (+1 active task).
// Since we have only two unique `cluster` tag values,
// we can only achieve "ideal" distribution on the 1st standby task assignment.
// We can't consider the `cluster` tag for the 2nd standby task assignment because the 1st standby
// task would already be assigned on different clusters compared to the active one, which means
// we have already used all the available cluster tag values. Taking the `cluster` tag into consideration
// for the 2nd standby task assignment would affectively mean excluding all the clients.
// Instead, for the 2nd standby task, we can only achieve partial rack awareness based on the `zone` tag.
// As we don't consider the `cluster` tag for the 2nd standby task assignment, partial rack awareness
// can be satisfied by placing the 2nd standby client on a different `zone` tag compared to active and corresponding standby tasks.
// The `zone` on either `cluster` tags are valid candidates for the partial rack awareness, as our goal is to distribute clients on the different `zone` tags.
Stream.of(UUID_2, UUID_5).forEach(client -> assertStandbyTaskCountForClientEqualsTo(clientStates, client, 1));
// There's no strong guarantee where 2nd standby task will end up.
Stream.of(UUID_1, UUID_3, UUID_4, UUID_6).forEach(client -> assertStandbyTaskCountForClientEqualsTo(clientStates, client, 0, 1));
assertTotalNumberOfStandbyTasksEqualsTo(clientStates, 4);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_0,
clientStates,
asList(
// Since it's located on a different `cluster` and `zone` tag dimensions,
// `UUID_5` is the "ideal" distribution for the 1st standby task assignment.
// For the 2nd standby, either `UUID_3` or `UUID_6` are valid destinations as
// we need to distribute the clients on different `zone`
// tags without considering the `cluster` tag value.
mkSet(UUID_5, UUID_3),
mkSet(UUID_5, UUID_6)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_1_0,
clientStates,
asList(
// The same comment as above applies here too.
// `UUID_2` is the ideal distribution on different `cluster`
// and `zone` tag dimensions. In contrast, `UUID_4` and `UUID_1`
// satisfy only the partial rack awareness as they are located on a different `zone` tag dimension.
mkSet(UUID_2, UUID_4),
mkSet(UUID_2, UUID_1)
)
)
);
}
@Test
public void shouldDistributeClientsOnDifferentZoneTagsEvenWhenClientsReachedCapacity() {
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_0)),
mkEntry(UUID_2, createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_1)),
mkEntry(UUID_3, createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_2)),
mkEntry(UUID_4, createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_1_0)),
mkEntry(UUID_5, createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_1_1)),
mkEntry(UUID_6, createClientStateWithCapacity(1, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_1_2))
);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(1, ZONE_TAG, CLUSTER_TAG);
standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs);
clientStates.keySet().forEach(client -> assertStandbyTaskCountForClientEqualsTo(clientStates, client, 1));
assertTotalNumberOfStandbyTasksEqualsTo(clientStates, 6);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_0,
clientStates,
asList(
mkSet(UUID_2), mkSet(UUID_5), mkSet(UUID_3), mkSet(UUID_6)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_1_0,
clientStates,
asList(
mkSet(UUID_2), mkSet(UUID_5), mkSet(UUID_3), mkSet(UUID_6)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_1,
clientStates,
asList(
mkSet(UUID_1), mkSet(UUID_4), mkSet(UUID_3), mkSet(UUID_6)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_1_1,
clientStates,
asList(
mkSet(UUID_1), mkSet(UUID_4), mkSet(UUID_3), mkSet(UUID_6)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_2,
clientStates,
asList(
mkSet(UUID_1), mkSet(UUID_4), mkSet(UUID_2), mkSet(UUID_5)
)
)
);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_1_2,
clientStates,
asList(
mkSet(UUID_1), mkSet(UUID_4), mkSet(UUID_2), mkSet(UUID_5)
)
)
);
}
@Test
public void shouldIgnoreTagsThatAreNotPresentInRackAwareness() {
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(1, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_1), mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0)),
mkEntry(UUID_2, createClientStateWithCapacity(2, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_1), mkEntry(ZONE_TAG, ZONE_2)))),
mkEntry(UUID_3, createClientStateWithCapacity(1, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_2), mkEntry(ZONE_TAG, ZONE_1))))
);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(1, CLUSTER_TAG);
standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs);
assertTotalNumberOfStandbyTasksEqualsTo(clientStates, 1);
assertEquals(1, clientStates.get(UUID_3).standbyTaskCount());
}
@Test
public void shouldHandleOverlappingTagValuesBetweenDifferentTagKeys() {
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_0)),
mkEntry(UUID_2, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, CLUSTER_1), mkEntry(CLUSTER_TAG, CLUSTER_3))))
);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(1, ZONE_TAG, CLUSTER_TAG);
standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs);
assertTotalNumberOfStandbyTasksEqualsTo(clientStates, 1);
assertTrue(
standbyClientsHonorRackAwareness(
TASK_0_0,
clientStates,
singletonList(
mkSet(UUID_2)
)
)
);
}
@Test
public void shouldDistributeStandbyTasksOnLeastLoadedClientsWhenClientsAreNotOnDifferentTagDimensions() {
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(3, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_1), mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0)),
mkEntry(UUID_2, createClientStateWithCapacity(3, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_1), mkEntry(ZONE_TAG, ZONE_1)), TASK_0_1)),
mkEntry(UUID_3, createClientStateWithCapacity(3, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_1), mkEntry(ZONE_TAG, ZONE_1)), TASK_0_2)),
mkEntry(UUID_4, createClientStateWithCapacity(3, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_1), mkEntry(ZONE_TAG, ZONE_1)), TASK_1_0))
);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(1, CLUSTER_TAG, ZONE_TAG);
standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs);
assertTotalNumberOfStandbyTasksEqualsTo(clientStates, 4);
assertEquals(1, clientStates.get(UUID_1).standbyTaskCount());
assertEquals(1, clientStates.get(UUID_2).standbyTaskCount());
assertEquals(1, clientStates.get(UUID_3).standbyTaskCount());
assertEquals(1, clientStates.get(UUID_4).standbyTaskCount());
}
@Test
public void shouldNotAssignStandbyTasksIfThereAreNoEnoughClients() {
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(3, mkMap(mkEntry(CLUSTER_TAG, CLUSTER_1), mkEntry(ZONE_TAG, ZONE_1)), TASK_0_0))
);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(1, CLUSTER_TAG, ZONE_TAG);
standbyTaskAssignor.assign(clientStates, allActiveTasks, allActiveTasks, assignmentConfigs);
assertTotalNumberOfStandbyTasksEqualsTo(clientStates, 0);
assertEquals(0, clientStates.get(UUID_1).standbyTaskCount());
}
private static void assertTotalNumberOfStandbyTasksEqualsTo(final Map<UUID, ClientState> clientStates, final int expectedTotalNumberOfStandbyTasks) {
final int actualTotalNumberOfStandbyTasks = clientStates.values().stream().map(ClientState::standbyTaskCount).reduce(0, Integer::sum);
assertEquals(expectedTotalNumberOfStandbyTasks, actualTotalNumberOfStandbyTasks);
}
private static void assertStandbyTaskCountForClientEqualsTo(final Map<UUID, ClientState> clientStates,
final UUID client,
final int... expectedStandbyTaskCounts) {
final int standbyTaskCount = clientStates.get(client).standbyTaskCount();
final String msg = String.format("Client [%s] doesn't have expected number of standby tasks. " +
"Expected any of %s, actual [%s]",
client, Arrays.toString(expectedStandbyTaskCounts), standbyTaskCount);
assertTrue(msg, Arrays.stream(expectedStandbyTaskCounts).anyMatch(expectedStandbyTaskCount -> expectedStandbyTaskCount == standbyTaskCount));
}
private static boolean standbyClientsHonorRackAwareness(final TaskId activeTaskId,
final Map<UUID, ClientState> clientStates,
final List<Set<UUID>> validClientIdsBasedOnRackAwareAssignmentTags) {
final Set<UUID> standbyTaskClientIds = findAllStandbyTaskClients(clientStates, activeTaskId);
return validClientIdsBasedOnRackAwareAssignmentTags.stream()
.filter(it -> it.equals(standbyTaskClientIds))
.count() == 1;
}
private static Set<UUID> findAllStandbyTaskClients(final Map<UUID, ClientState> clientStates, final TaskId task) {
return clientStates.keySet()
.stream()
.filter(clientId -> clientStates.get(clientId).standbyTasks().contains(task))
.collect(Collectors.toSet());
}
private static AssignmentConfigs newAssignmentConfigs(final int numStandbyReplicas,
final String... rackAwareAssignmentTags) {
return new AssignmentConfigs(0L,
1,
numStandbyReplicas,
60000L,
asList(rackAwareAssignmentTags));
}
private static ClientState createClientStateWithCapacity(final int capacity,
final Map<String, String> clientTags,
final TaskId... tasks) {
final ClientState clientState = new ClientState(capacity, clientTags);
Optional.ofNullable(tasks).ifPresent(t -> clientState.assignActiveTasks(asList(t)));
return clientState;
}
private static Set<TaskId> findAllActiveTasks(final Map<UUID, ClientState> clientStates) {
return clientStates.entrySet()
.stream()
.flatMap(clientStateEntry -> clientStateEntry.getValue().activeTasks().stream())
.collect(Collectors.toSet());
}
}

View File

@ -28,6 +28,7 @@ import java.util.UUID;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
@ -53,7 +54,7 @@ public class FallbackPriorTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(probingRebalanceNeeded, is(true));

View File

@ -20,8 +20,10 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@ -29,10 +31,13 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
@ -63,6 +68,7 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class HighAvailabilityTaskAssignorTest {
@ -70,22 +76,24 @@ public class HighAvailabilityTaskAssignorTest {
/*acceptableRecoveryLag*/ 100L,
/*maxWarmupReplicas*/ 2,
/*numStandbyReplicas*/ 0,
/*probingRebalanceIntervalMs*/ 60 * 1000L
/*probingRebalanceIntervalMs*/ 60 * 1000L,
/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
);
private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
/*acceptableRecoveryLag*/ 100L,
/*maxWarmupReplicas*/ 2,
/*numStandbyReplicas*/ 1,
/*probingRebalanceIntervalMs*/ 60 * 1000L
/*probingRebalanceIntervalMs*/ 60 * 1000L,
/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
);
@Test
public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
final ClientState clientState1 = new ClientState(allTaskIds, emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 1);
final ClientState clientState2 = new ClientState(emptySet(), allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 1);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), 1);
final ClientState clientState1 = new ClientState(allTaskIds, emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), EMPTY_CLIENT_TAGS, 1);
final ClientState clientState2 = new ClientState(emptySet(), allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), EMPTY_CLIENT_TAGS, 1);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), EMPTY_CLIENT_TAGS, 1);
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, clientState1),
@ -97,7 +105,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(11L, 2, 1, 60_000L)
new AssignmentConfigs(11L, 2, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
@ -112,9 +120,9 @@ public class HighAvailabilityTaskAssignorTest {
@Test
public void shouldSkipWarmupsWhenAcceptableLagIsMax() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
final ClientState clientState1 = new ClientState(allTaskIds, emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 1);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), 1);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), 1);
final ClientState clientState1 = new ClientState(allTaskIds, emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), EMPTY_CLIENT_TAGS, 1);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), EMPTY_CLIENT_TAGS, 1);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)), EMPTY_CLIENT_TAGS, 1);
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, clientState1),
@ -126,7 +134,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(Long.MAX_VALUE, 1, 1, 60_000L)
new AssignmentConfigs(Long.MAX_VALUE, 1, 1, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(clientState1, hasAssignedTasks(6));
@ -139,15 +147,15 @@ public class HighAvailabilityTaskAssignorTest {
public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClientsIntegralDivisorOfNumberOfTasks() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
final Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, 1);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, 1);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), lags, 1);
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 1);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 1);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 1);
final Map<UUID, ClientState> clientStates = getClientStatesMap(clientState1, clientState2, clientState3);
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder());
@ -160,15 +168,15 @@ public class HighAvailabilityTaskAssignorTest {
public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfThreadsIntegralDivisorOfNumberOfTasks() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
final Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, 3);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, 3);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), lags, 3);
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 3);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 3);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 3);
final Map<UUID, ClientState> clientStates = getClientStatesMap(clientState1, clientState2, clientState3);
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder());
@ -181,14 +189,14 @@ public class HighAvailabilityTaskAssignorTest {
public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClientsNotIntegralDivisorOfNumberOfTasks() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
final Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, 1);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, 1);
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 1);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 1);
final Map<UUID, ClientState> clientStates = getClientStatesMap(clientState1, clientState2);
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@ -202,15 +210,15 @@ public class HighAvailabilityTaskAssignorTest {
public void shouldAssignActiveStatefulTasksEvenlyOverUnevenlyDistributedStreamThreads() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2);
final Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, 1);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, 2);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), lags, 3);
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 1);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 2);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 3);
final Map<UUID, ClientState> clientStates = getClientStatesMap(clientState1, clientState2, clientState3);
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@ -231,15 +239,15 @@ public class HighAvailabilityTaskAssignorTest {
public void shouldAssignActiveStatefulTasksEvenlyOverClientsWithMoreClientsThanTasks() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1);
final Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, 1);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, 1);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), lags, 1);
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 1);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 1);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 1);
final Map<UUID, ClientState> clientStates = getClientStatesMap(clientState1, clientState2, clientState3);
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@ -253,15 +261,15 @@ public class HighAvailabilityTaskAssignorTest {
public void shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWithEqualStreamThreadsPerClientAsTasks() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
final Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, 9);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, 9);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), lags, 9);
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 9);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 9);
final ClientState clientState3 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 9);
final Map<UUID, ClientState> clientStates = getClientStatesMap(clientState1, clientState2, clientState3);
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@ -277,16 +285,16 @@ public class HighAvailabilityTaskAssignorTest {
final Map<TaskId, Long> lagsForCaughtUpClient = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L));
final Map<TaskId, Long> lagsForNotCaughtUpClient =
allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE));
final ClientState caughtUpClientState = new ClientState(allTaskIds, emptySet(), lagsForCaughtUpClient, 5);
final ClientState notCaughtUpClientState1 = new ClientState(emptySet(), emptySet(), lagsForNotCaughtUpClient, 5);
final ClientState notCaughtUpClientState2 = new ClientState(emptySet(), emptySet(), lagsForNotCaughtUpClient, 5);
final ClientState caughtUpClientState = new ClientState(allTaskIds, emptySet(), lagsForCaughtUpClient, EMPTY_CLIENT_TAGS, 5);
final ClientState notCaughtUpClientState1 = new ClientState(emptySet(), emptySet(), lagsForNotCaughtUpClient, EMPTY_CLIENT_TAGS, 5);
final ClientState notCaughtUpClientState2 = new ClientState(emptySet(), emptySet(), lagsForNotCaughtUpClient, EMPTY_CLIENT_TAGS, 5);
final Map<UUID, ClientState> clientStates =
getClientStatesMap(caughtUpClientState, notCaughtUpClientState1, notCaughtUpClientState2);
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L)
new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(true));
@ -307,16 +315,16 @@ public class HighAvailabilityTaskAssignorTest {
final Map<TaskId, Long> lagsForWarmedUpClient2 =
allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE));
lagsForWarmedUpClient2.put(TASK_1_0, 0L);
final ClientState caughtUpClientState = new ClientState(allTaskIds, emptySet(), lagsForCaughtUpClient, 5);
final ClientState warmedUpClientState1 = new ClientState(emptySet(), warmedUpTaskIds1, lagsForWarmedUpClient1, 5);
final ClientState warmedUpClientState2 = new ClientState(emptySet(), warmedUpTaskIds2, lagsForWarmedUpClient2, 5);
final ClientState caughtUpClientState = new ClientState(allTaskIds, emptySet(), lagsForCaughtUpClient, EMPTY_CLIENT_TAGS, 5);
final ClientState warmedUpClientState1 = new ClientState(emptySet(), warmedUpTaskIds1, lagsForWarmedUpClient1, EMPTY_CLIENT_TAGS, 5);
final ClientState warmedUpClientState2 = new ClientState(emptySet(), warmedUpTaskIds2, lagsForWarmedUpClient2, EMPTY_CLIENT_TAGS, 5);
final Map<UUID, ClientState> clientStates =
getClientStatesMap(caughtUpClientState, warmedUpClientState1, warmedUpClientState2);
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L)
new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@ -330,14 +338,14 @@ public class HighAvailabilityTaskAssignorTest {
public void shouldAssignActiveStatefulTasksEvenlyOverStreamThreadsButBestEffortOverClients() {
final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
final Map<TaskId, Long> lags = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L));
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, 6);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, 3);
final ClientState clientState1 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 6);
final ClientState clientState2 = new ClientState(emptySet(), emptySet(), lags, EMPTY_CLIENT_TAGS, 3);
final Map<UUID, ClientState> clientStates = getClientStatesMap(clientState1, clientState2);
final boolean unstable = new HighAvailabilityTaskAssignor().assign(
clientStates,
allTaskIds,
allTaskIds,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(unstable, is(false));
@ -351,7 +359,7 @@ public class HighAvailabilityTaskAssignorTest {
@Test
public void shouldComputeNewAssignmentIfThereAreUnassignedActiveTasks() {
final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1);
final ClientState client1 = new ClientState(singleton(TASK_0_0), emptySet(), singletonMap(TASK_0_0, 0L), 1);
final ClientState client1 = new ClientState(singleton(TASK_0_0), emptySet(), singletonMap(TASK_0_0, 0L), EMPTY_CLIENT_TAGS, 1);
final Map<UUID, ClientState> clientStates = singletonMap(UUID_1, client1);
final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates,
@ -373,8 +381,8 @@ public class HighAvailabilityTaskAssignorTest {
public void shouldComputeNewAssignmentIfThereAreUnassignedStandbyTasks() {
final Set<TaskId> allTasks = mkSet(TASK_0_0);
final Set<TaskId> statefulTasks = mkSet(TASK_0_0);
final ClientState client1 = new ClientState(singleton(TASK_0_0), emptySet(), singletonMap(TASK_0_0, 0L), 1);
final ClientState client2 = new ClientState(emptySet(), emptySet(), singletonMap(TASK_0_0, 0L), 1);
final ClientState client1 = new ClientState(singleton(TASK_0_0), emptySet(), singletonMap(TASK_0_0, 0L), EMPTY_CLIENT_TAGS, 1);
final ClientState client2 = new ClientState(emptySet(), emptySet(), singletonMap(TASK_0_0, 0L), EMPTY_CLIENT_TAGS, 1);
final Map<UUID, ClientState> clientStates = mkMap(mkEntry(UUID_1, client1), mkEntry(UUID_2, client2));
final boolean probingRebalanceNeeded = new HighAvailabilityTaskAssignor().assign(clientStates,
@ -394,8 +402,8 @@ public class HighAvailabilityTaskAssignorTest {
public void shouldComputeNewAssignmentIfActiveTasksWasNotOnCaughtUpClient() {
final Set<TaskId> allTasks = mkSet(TASK_0_0, TASK_0_1);
final Set<TaskId> statefulTasks = mkSet(TASK_0_0);
final ClientState client1 = new ClientState(singleton(TASK_0_0), emptySet(), singletonMap(TASK_0_0, 500L), 1);
final ClientState client2 = new ClientState(singleton(TASK_0_1), emptySet(), singletonMap(TASK_0_0, 0L), 1);
final ClientState client1 = new ClientState(singleton(TASK_0_0), emptySet(), singletonMap(TASK_0_0, 500L), EMPTY_CLIENT_TAGS, 1);
final ClientState client2 = new ClientState(singleton(TASK_0_1), emptySet(), singletonMap(TASK_0_0, 0L), EMPTY_CLIENT_TAGS, 1);
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, client1),
mkEntry(UUID_2, client2)
@ -490,7 +498,8 @@ public class HighAvailabilityTaskAssignorTest {
/*acceptableRecoveryLag*/ 100L,
/*maxWarmupReplicas*/ 1,
/*numStandbyReplicas*/ 0,
/*probingRebalanceIntervalMs*/ 60 * 1000L
/*probingRebalanceIntervalMs*/ 60 * 1000L,
/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
)
);
@ -518,7 +527,8 @@ public class HighAvailabilityTaskAssignorTest {
/*acceptableRecoveryLag*/ 100L,
/*maxWarmupReplicas*/ 1,
/*numStandbyReplicas*/ 1,
/*probingRebalanceIntervalMs*/ 60 * 1000L
/*probingRebalanceIntervalMs*/ 60 * 1000L,
/*rackAwareAssignmentTags*/ EMPTY_RACK_AWARE_ASSIGNMENT_TAGS
)
);
@ -621,9 +631,9 @@ public class HighAvailabilityTaskAssignorTest {
mkSet(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_1_0, TASK_1_1, TASK_1_2, TASK_1_3, TASK_2_0); // 9 total
final Map<TaskId, Long> allTaskLags = allTasks.stream().collect(Collectors.toMap(t -> t, t -> 0L));
final Set<TaskId> statefulTasks = new HashSet<>(allTasks);
final ClientState client1 = new ClientState(emptySet(), emptySet(), allTaskLags, 100);
final ClientState client2 = new ClientState(emptySet(), emptySet(), allTaskLags, 50);
final ClientState client3 = new ClientState(emptySet(), emptySet(), allTaskLags, 1);
final ClientState client1 = new ClientState(emptySet(), emptySet(), allTaskLags, EMPTY_CLIENT_TAGS, 100);
final ClientState client2 = new ClientState(emptySet(), emptySet(), allTaskLags, EMPTY_CLIENT_TAGS, 50);
final ClientState client3 = new ClientState(emptySet(), emptySet(), allTaskLags, EMPTY_CLIENT_TAGS, 1);
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
@ -687,9 +697,9 @@ public class HighAvailabilityTaskAssignorTest {
final Set<TaskId> statelessTasks = new HashSet<>(allTasks);
final Map<TaskId, Long> taskLags = new HashMap<>();
final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, 7);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 7);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 7);
final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 7);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 7);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 7);
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
@ -697,7 +707,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertValidAssignment(
@ -718,9 +728,9 @@ public class HighAvailabilityTaskAssignorTest {
final Set<TaskId> statelessTasks = new HashSet<>(allTasks);
final Map<TaskId, Long> taskLags = new HashMap<>();
final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, 2);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 2);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 2);
final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 2);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 2);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 2);
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
@ -728,7 +738,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertValidAssignment(
@ -749,9 +759,9 @@ public class HighAvailabilityTaskAssignorTest {
final Set<TaskId> statelessTasks = new HashSet<>(allTasks);
final Map<TaskId, Long> taskLags = new HashMap<>();
final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, 1);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 2);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 3);
final ClientState client1 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 1);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 2);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 3);
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
@ -759,7 +769,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertValidAssignment(
@ -780,9 +790,9 @@ public class HighAvailabilityTaskAssignorTest {
final Set<TaskId> statelessTasks = new HashSet<>(allTasks);
final Map<TaskId, Long> taskLags = new HashMap<>();
final ClientState client1 = new ClientState(statelessTasks, emptySet(), taskLags, 3);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, 3);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, 3);
final ClientState client1 = new ClientState(statelessTasks, emptySet(), taskLags, EMPTY_CLIENT_TAGS, 3);
final ClientState client2 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 3);
final ClientState client3 = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, 3);
final Map<UUID, ClientState> clientStates = getClientStatesMap(client1, client2, client3);
@ -790,7 +800,7 @@ public class HighAvailabilityTaskAssignorTest {
clientStates,
allTasks,
statefulTasks,
new AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertValidAssignment(
@ -804,6 +814,27 @@ public class HighAvailabilityTaskAssignorTest {
assertThat(probingRebalanceNeeded, is(false));
}
@Test
public void shouldReturnClientTagAwareStandbyTaskAssignorWhenRackAwareAssignmentTagsIsSet() {
final StandbyTaskAssignor standbyTaskAssignor = HighAvailabilityTaskAssignor.createStandbyTaskAssignor(newAssignmentConfigs(1, singletonList("az")));
assertTrue(standbyTaskAssignor instanceof ClientTagAwareStandbyTaskAssignor);
}
@Test
public void shouldReturnDefaultStandbyTaskAssignorWhenRackAwareAssignmentTagsIsEmpty() {
final StandbyTaskAssignor standbyTaskAssignor = HighAvailabilityTaskAssignor.createStandbyTaskAssignor(newAssignmentConfigs(1, Collections.emptyList()));
assertTrue(standbyTaskAssignor instanceof DefaultStandbyTaskAssignor);
}
private static AssignorConfiguration.AssignmentConfigs newAssignmentConfigs(final int numStandbyReplicas,
final List<String> rackAwareAssignmentTags) {
return new AssignorConfiguration.AssignmentConfigs(0L,
1,
numStandbyReplicas,
60000L,
rackAwareAssignmentTags);
}
private static void assertHasNoActiveTasks(final ClientState... clients) {
for (final ClientState client : clients) {
assertThat(client.activeTasks(), is(empty()));
@ -829,6 +860,6 @@ public class HighAvailabilityTaskAssignorTest {
taskLags.put(task, Long.MAX_VALUE);
}
}
return new ClientState(statefulActiveTasks, emptySet(), taskLags, 1);
return new ClientState(statefulActiveTasks, emptySet(), taskLags, EMPTY_CLIENT_TAGS, 1);
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
public class StandbyTaskAssignmentUtilsTest {
private static final Set<TaskId> ACTIVE_TASKS = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
private Map<UUID, ClientState> clients;
private ConstrainedPrioritySet clientsByTaskLoad;
@Before
public void setup() {
clients = getClientStatesMap(ACTIVE_TASKS.stream().map(StandbyTaskAssignmentUtilsTest::mkState).toArray(ClientState[]::new));
clientsByTaskLoad = new ConstrainedPrioritySet(
(client, task) -> !clients.get(client).hasAssignedTask(task),
client -> clients.get(client).assignedTaskLoad()
);
clientsByTaskLoad.offerAll(clients.keySet());
}
@Test
public void shouldReturnNumberOfStandbyTasksThatWereNotAssigned() {
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(3, ACTIVE_TASKS);
assertTrue(tasksToRemainingStandbys.keySet()
.stream()
.map(taskId -> pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(
clients,
tasksToRemainingStandbys,
clientsByTaskLoad,
taskId
))
.allMatch(numRemainingStandbys -> numRemainingStandbys == 1));
assertTrue(ACTIVE_TASKS.stream().allMatch(activeTask -> tasksToRemainingStandbys.get(activeTask) == 1));
assertTrue(areStandbyTasksPresentForAllActiveTasks(2));
}
@Test
public void shouldReturnZeroWhenAllStandbyTasksWereSuccessfullyAssigned() {
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(1, ACTIVE_TASKS);
assertTrue(tasksToRemainingStandbys.keySet()
.stream()
.map(taskId -> pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(
clients,
tasksToRemainingStandbys,
clientsByTaskLoad,
taskId
))
.allMatch(numRemainingStandbys -> numRemainingStandbys == 0));
assertTrue(ACTIVE_TASKS.stream().allMatch(activeTask -> tasksToRemainingStandbys.get(activeTask) == 0));
assertTrue(areStandbyTasksPresentForAllActiveTasks(1));
}
@Test
public void shouldComputeTasksToRemainingStandbys() {
assertThat(
computeTasksToRemainingStandbys(0, ACTIVE_TASKS),
equalTo(
ACTIVE_TASKS.stream().collect(Collectors.toMap(Function.identity(), it -> 0))
)
);
assertThat(
computeTasksToRemainingStandbys(5, ACTIVE_TASKS),
equalTo(
ACTIVE_TASKS.stream().collect(Collectors.toMap(Function.identity(), it -> 5))
)
);
}
private boolean areStandbyTasksPresentForAllActiveTasks(final int expectedNumberOfStandbyTasks) {
return ACTIVE_TASKS.stream().allMatch(taskId -> clients.values()
.stream()
.filter(client -> client.hasStandbyTask(taskId))
.count() == expectedNumberOfStandbyTasks);
}
private static ClientState mkState(final TaskId... activeTasks) {
return mkState(1, activeTasks);
}
private static ClientState mkState(final int capacity, final TaskId... activeTasks) {
final ClientState clientState = new ClientState(capacity);
for (final TaskId activeTask : activeTasks) {
clientState.assignActive(activeTask);
}
return clientState;
}
}

View File

@ -33,6 +33,7 @@ import java.util.UUID;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
@ -676,7 +677,7 @@ public class StickyTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L)
new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(probingRebalanceNeeded, is(false));
@ -695,7 +696,7 @@ public class StickyTaskAssignorTest {
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, 60_000L)
new AssignorConfiguration.AssignmentConfigs(0L, 1, numStandbys, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
}

View File

@ -29,6 +29,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Supplier;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.appendClientStates;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedActiveAssignment;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedStatefulAssignment;
@ -231,7 +232,8 @@ public class TaskAssignorConvergenceTest {
final AssignmentConfigs configs = new AssignmentConfigs(100L,
2,
0,
60_000L);
60_000L,
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
@ -250,7 +252,8 @@ public class TaskAssignorConvergenceTest {
final AssignmentConfigs configs = new AssignmentConfigs(100L,
maxWarmupReplicas,
numStandbyReplicas,
60_000L);
60_000L,
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 1, () -> 5);
testForConvergence(harness, configs, 1);
@ -272,7 +275,8 @@ public class TaskAssignorConvergenceTest {
final AssignmentConfigs configs = new AssignmentConfigs(100L,
maxWarmupReplicas,
numStandbyReplicas,
60_000L);
60_000L,
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
final Harness harness = Harness.initializeCluster(numStatelessTasks, numStatefulTasks, 7, () -> 5);
testForConvergence(harness, configs, 1);
@ -313,7 +317,8 @@ public class TaskAssignorConvergenceTest {
final AssignmentConfigs configs = new AssignmentConfigs(100L,
maxWarmupReplicas,
numStandbyReplicas,
60_000L);
60_000L,
EMPTY_RACK_AWARE_ASSIGNMENT_TAGS);
harness = Harness.initializeCluster(
numStatelessTasks,