KAFKA-6718 / Add rack awareness configurations to StreamsConfig (#11837)

This PR is part of KIP-708 and adds rack aware standby task assignment logic.

Rack aware standby task assignment won't be functional until all parts of this KIP gets merged.

Splitting PRs into three smaller PRs to make the review process easier to follow. Overall plan is the following:

⏭️ Rack aware standby task assignment logic #10851
⏭️ Protocol change, add clientTags to SubscriptionInfoData #10802
👉 Add required configurations to StreamsConfig (public API change, at this point we should have full functionality)

This PR implements last point of the above mentioned plan.

Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Levani Kokhreidze 2022-03-16 19:02:24 +02:00 committed by GitHub
parent 5c1dd493d6
commit b68463c250
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 944 additions and 119 deletions

View File

@ -1121,6 +1121,27 @@ public class ConfigDef {
}
}
public static class ListSize implements Validator {
final int maxSize;
private ListSize(final int maxSize) {
this.maxSize = maxSize;
}
public static ListSize atMostOfSize(final int maxSize) {
return new ListSize(maxSize);
}
@Override
public void ensureValid(final String name, final Object value) {
@SuppressWarnings("unchecked")
List<String> values = (List<String>) value;
if (values.size() > maxSize) {
throw new ConfigException(name, value, "exceeds maximum list size of [" + maxSize + "].");
}
}
}
public static class ConfigKey {
public final String name;
public final Type type;

View File

@ -18,6 +18,7 @@ package org.apache.kafka.common.config;
import org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.ListSize;
import org.apache.kafka.common.config.ConfigDef.Range;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.ValidString;
@ -38,6 +39,8 @@ import java.util.Properties;
import java.util.Set;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -426,7 +429,7 @@ public class ConfigDefTest {
public void testMissingDependentConfigs() {
// Should not be possible to parse a config if a dependent config has not been defined
final ConfigDef configDef = new ConfigDef()
.define("parent", Type.STRING, Importance.HIGH, "parent docs", "group", 1, Width.LONG, "Parent", Collections.singletonList("child"));
.define("parent", Type.STRING, Importance.HIGH, "parent docs", "group", 1, Width.LONG, "Parent", singletonList("child"));
assertThrows(ConfigException.class, () -> configDef.parse(Collections.emptyMap()));
}
@ -438,7 +441,7 @@ public class ConfigDefTest {
assertEquals(new HashSet<>(Arrays.asList("a")), baseConfigDef.getConfigsWithNoParent());
final ConfigDef configDef = new ConfigDef(baseConfigDef)
.define("parent", Type.STRING, Importance.HIGH, "parent docs", "group", 1, Width.LONG, "Parent", Collections.singletonList("child"))
.define("parent", Type.STRING, Importance.HIGH, "parent docs", "group", 1, Width.LONG, "Parent", singletonList("child"))
.define("child", Type.STRING, Importance.HIGH, "docs");
assertEquals(new HashSet<>(Arrays.asList("a", "parent")), configDef.getConfigsWithNoParent());
@ -541,7 +544,7 @@ public class ConfigDefTest {
.define("opt2.of.group2", Type.BOOLEAN, false, Importance.HIGH, "Doc doc doc doc.",
"Group Two", 1, Width.NONE, "..", Collections.<String>emptyList())
.define("opt1.of.group2", Type.BOOLEAN, false, Importance.HIGH, "Doc doc doc doc doc.",
"Group Two", 0, Width.NONE, "..", Collections.singletonList("some.option"))
"Group Two", 0, Width.NONE, "..", singletonList("some.option"))
.define("poor.opt", Type.STRING, "foo", Importance.HIGH, "Doc doc doc doc.");
final String expectedRst = "" +
@ -722,4 +725,37 @@ public class ConfigDefTest {
assertEquals(" (365 days)", ConfigDef.niceTimeUnits(Duration.ofDays(365).toMillis()));
}
@Test
public void testThrowsExceptionWhenListSizeExceedsLimit() {
final ConfigException exception = assertThrows(ConfigException.class, () -> new ConfigDef().define("lst",
Type.LIST,
asList("a", "b"),
ListSize.atMostOfSize(1),
Importance.HIGH,
"lst doc"));
assertEquals("Invalid value [a, b] for configuration lst: exceeds maximum list size of [1].",
exception.getMessage());
}
@Test
public void testNoExceptionIsThrownWhenListSizeEqualsTheLimit() {
final List<String> lst = asList("a", "b", "c");
assertDoesNotThrow(() -> new ConfigDef().define("lst",
Type.LIST,
lst,
ListSize.atMostOfSize(lst.size()),
Importance.HIGH,
"lst doc"));
}
@Test
public void testNoExceptionIsThrownWhenListSizeIsBelowTheLimit() {
assertDoesNotThrow(() -> new ConfigDef().define("lst",
Type.LIST,
asList("a", "b"),
ListSize.atMostOfSize(3),
Importance.HIGH,
"lst doc"));
}
}

View File

@ -48,12 +48,16 @@ import java.io.File;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
import static org.apache.kafka.common.config.ConfigDef.ListSize.atMostOfSize;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
@ -148,6 +152,12 @@ public class StreamsConfig extends AbstractConfig {
public static final int DUMMY_THREAD_INDEX = 1;
public static final long MAX_TASK_IDLE_MS_DISABLED = -1;
// We impose these limitations because client tags are encoded into the subscription info,
// which is part of the group metadata message that is persisted into the internal topic.
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE = 5;
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH = 20;
public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH = 30;
/**
* Prefix used to provide default topic configs to be applied when creating internal topics.
* These should be valid properties from {@link org.apache.kafka.common.config.TopicConfig TopicConfig}.
@ -212,6 +222,15 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String ADMIN_CLIENT_PREFIX = "admin.";
/**
* Prefix used to add arbitrary tags to a Kafka Stream's instance as key-value pairs.
* Example:
* client.tag.zone=zone1
* client.tag.cluster=cluster1
*/
@SuppressWarnings("WeakerAccess")
public static final String CLIENT_TAG_PREFIX = "client.tag.";
/**
* Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for disabling topology optimization
*/
@ -511,6 +530,13 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
/** {@code rack.aware.assignment.tags} */
@SuppressWarnings("WeakerAccess")
public static final String RACK_AWARE_ASSIGNMENT_TAGS_CONFIG = "rack.aware.assignment.tags";
private static final String RACK_AWARE_ASSIGNMENT_TAGS_DOC = "List of client tag keys used to distribute standby replicas across Kafka Streams instances." +
" When configured, Kafka Streams will make a best-effort to distribute" +
" the standby tasks over each client tag dimension.";
/** {@code reconnect.backoff.ms} */
@SuppressWarnings("WeakerAccess")
public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
@ -726,6 +752,12 @@ public class StreamsConfig extends AbstractConfig {
in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2),
Importance.MEDIUM,
PROCESSING_GUARANTEE_DOC)
.define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
Type.LIST,
Collections.emptyList(),
atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE),
Importance.MEDIUM,
RACK_AWARE_ASSIGNMENT_TAGS_DOC)
.define(REPLICATION_FACTOR_CONFIG,
Type.INT,
-1,
@ -1040,6 +1072,16 @@ public class StreamsConfig extends AbstractConfig {
return RESTORE_CONSUMER_PREFIX + consumerProp;
}
/**
* Prefix a client tag key with {@link #CLIENT_TAG_PREFIX}.
*
* @param clientTagKey client tag key
* @return {@link #CLIENT_TAG_PREFIX} + {@code clientTagKey}
*/
public static String clientTagPrefix(final String clientTagKey) {
return CLIENT_TAG_PREFIX + clientTagKey;
}
/**
* Prefix a property with {@link #GLOBAL_CONSUMER_PREFIX}. This is used to isolate {@link ConsumerConfig global consumer configs}
* from other client configs.
@ -1159,9 +1201,43 @@ public class StreamsConfig extends AbstractConfig {
configUpdates.put(COMMIT_INTERVAL_MS_CONFIG, EOS_DEFAULT_COMMIT_INTERVAL_MS);
}
validateRackAwarenessConfiguration();
return configUpdates;
}
private void validateRackAwarenessConfiguration() {
final List<String> rackAwareAssignmentTags = getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
final Map<String, String> clientTags = getClientTags();
if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
throw new ConfigException("At most " + MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
"can be specified using " + CLIENT_TAG_PREFIX + " prefix.");
}
for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
if (!clientTags.containsKey(rackAwareAssignmentTag)) {
throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
rackAwareAssignmentTags,
"Contains invalid value [" + rackAwareAssignmentTag + "] " +
"which doesn't have corresponding tag set via [" + CLIENT_TAG_PREFIX + "] prefix.");
}
}
clientTags.forEach((tagKey, tagValue) -> {
if (tagKey.length() > MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH) {
throw new ConfigException(CLIENT_TAG_PREFIX,
tagKey,
"Tag key exceeds maximum length of " + MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH + ".");
}
if (tagValue.length() > MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH) {
throw new ConfigException(CLIENT_TAG_PREFIX,
tagValue,
"Tag value exceeds maximum length of " + MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH + ".");
}
});
}
private Map<String, Object> getCommonConsumerConfigs() {
final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
@ -1295,6 +1371,7 @@ public class StreamsConfig extends AbstractConfig {
consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));
consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName());
consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
consumerProps.put(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG));
// disable auto topic creation
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
@ -1441,6 +1518,21 @@ public class StreamsConfig extends AbstractConfig {
return props;
}
/**
* Get the configured client tags set with {@link #CLIENT_TAG_PREFIX} prefix.
*
* @return Map of the client tags.
*/
@SuppressWarnings("WeakerAccess")
public Map<String, String> getClientTags() {
return originalsWithPrefix(CLIENT_TAG_PREFIX).entrySet().stream().collect(
Collectors.toMap(
Map.Entry::getKey,
tagEntry -> Objects.toString(tagEntry.getValue())
)
);
}
private Map<String, Object> getClientPropsWithPrefix(final String prefix,
final Set<String> configNames) {
final Map<String, Object> props = clientProps(configNames, originals());

View File

@ -345,6 +345,7 @@ public class StreamThread extends Thread {
referenceContainer.adminClient = adminClient;
referenceContainer.streamsMetadataState = streamsMetadataState;
referenceContainer.time = time;
referenceContainer.clientTags = config.getClientTags();
log.info("Creating restore consumer client");
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(getRestoreConsumerClientId(threadId));

View File

@ -128,7 +128,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private final ClientState state;
private final SortedSet<String> consumers;
ClientMetadata(final String endPoint) {
ClientMetadata(final String endPoint, final Map<String, String> clientTags) {
// get the host info, or null if no endpoint is configured (ie endPoint == null)
hostInfo = HostInfo.buildFromEndpoint(endPoint);
@ -136,8 +136,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// initialize the consumer memberIds
consumers = new TreeSet<>();
// initialize the client state
state = new ClientState();
// initialize the client state with client tags
state = new ClientState(clientTags);
}
void addConsumer(final String consumerMemberId, final List<TopicPartition> ownedPartitions) {
@ -189,6 +189,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private Supplier<TaskAssignor> taskAssignorSupplier;
private byte uniqueField;
private Map<String, String> clientTags;
/**
* We need to have the PartitionAssignor and its StreamThread to be mutually accessible since the former needs
@ -223,6 +224,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
taskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
uniqueField = 0;
clientTags = referenceContainer.clientTags;
}
@Override
@ -265,7 +267,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
taskOffsetSums,
uniqueField,
assignmentErrorCode.get(),
Collections.emptyMap()
clientTags
).encode();
}
@ -338,7 +340,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
futureMetadataVersion = usedVersion;
processId = FUTURE_ID;
if (!clientMetadataMap.containsKey(FUTURE_ID)) {
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null));
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null, Collections.emptyMap()));
}
} else {
processId = info.processId();
@ -348,7 +350,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// create the new client metadata if necessary
if (clientMetadata == null) {
clientMetadata = new ClientMetadata(info.userEndPoint());
clientMetadata = new ClientMetadata(info.userEndPoint(), info.clientTags());
clientMetadataMap.put(info.processId(), clientMetadata);
}
@ -1462,6 +1464,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
return uniqueField;
}
protected Map<String, String> clientTags() {
return clientTags;
}
protected void handleRebalanceStart(final Set<String> topics) {
taskManager.handleRebalanceStart(topics);
}

View File

@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@ -250,7 +249,7 @@ public final class AssignorConfiguration {
maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
rackAwareAssignmentTags = Collections.emptyList();
rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
}
AssignmentConfigs(final Long acceptableRecoveryLag,
@ -262,7 +261,7 @@ public final class AssignorConfiguration {
this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
this.rackAwareAssignmentTags = rackAwareAssignmentTags;
this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags);
}
private static <T> T validated(final String configKey, final T value) {
@ -280,6 +279,7 @@ public final class AssignorConfiguration {
"\n maxWarmupReplicas=" + maxWarmupReplicas +
"\n numStandbyReplicas=" + numStandbyReplicas +
"\n probingRebalanceIntervalMs=" + probingRebalanceIntervalMs +
"\n rackAwareAssignmentTags=" + rackAwareAssignmentTags +
"\n}";
}
}

View File

@ -29,17 +29,16 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static java.util.Collections.unmodifiableSet;
import static java.util.Comparator.comparing;
import static java.util.Comparator.comparingLong;
import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;
@ -65,6 +64,10 @@ public class ClientState {
this(0);
}
public ClientState(final Map<String, String> clientTags) {
this(0, clientTags);
}
ClientState(final int capacity) {
this(capacity, Collections.emptyMap());
}
@ -422,6 +425,7 @@ public class ClientState {
") prevStandbyTasks: (" + previousStandbyTasks.taskIds() +
") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() +
") taskLagTotals: (" + taskLagTotals.entrySet() +
") clientTags: (" + clientTags.entrySet() +
") capacity: " + capacity +
" assigned: " + assignedTaskCount() +
"]";

View File

@ -60,7 +60,7 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(
numStandbyReplicas,
allTaskIds
statefulTaskIds
);
final Map<String, Set<String>> tagKeyToValues = new HashMap<>();
@ -79,6 +79,7 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
if (clientState.activeTasks().contains(statefulTaskId)) {
assignStandbyTasksToClientsWithDifferentTags(
numStandbyReplicas,
standbyTaskClientsByTaskLoad,
statefulTaskId,
clientId,
@ -94,17 +95,10 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
}
if (!tasksToRemainingStandbys.isEmpty()) {
log.debug("Rack aware standby task assignment was not able to assign all standby tasks. " +
"tasksToRemainingStandbys=[{}], pendingStandbyTasksToClientId=[{}]. " +
"Will distribute the remaining standby tasks to least loaded clients.",
tasksToRemainingStandbys, pendingStandbyTasksToClientId);
assignPendingStandbyTasksToLeastLoadedClients(clients,
numStandbyReplicas,
rackAwareAssignmentTags,
standbyTaskClientsByTaskLoad,
tasksToRemainingStandbys,
pendingStandbyTasksToClientId);
tasksToRemainingStandbys);
}
// returning false, because standby task assignment will never require a follow-up probing rebalance.
@ -113,34 +107,22 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
private static void assignPendingStandbyTasksToLeastLoadedClients(final Map<UUID, ClientState> clients,
final int numStandbyReplicas,
final Set<String> rackAwareAssignmentTags,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final Map<TaskId, Integer> pendingStandbyTaskToNumberRemainingStandbys,
final Map<TaskId, UUID> pendingStandbyTaskToClientId) {
final Map<TaskId, Integer> pendingStandbyTaskToNumberRemainingStandbys) {
// We need to re offer all the clients to find the least loaded ones
standbyTaskClientsByTaskLoad.offerAll(clients.keySet());
for (final Entry<TaskId, Integer> pendingStandbyTaskAssignmentEntry : pendingStandbyTaskToNumberRemainingStandbys.entrySet()) {
final TaskId activeTaskId = pendingStandbyTaskAssignmentEntry.getKey();
final UUID clientId = pendingStandbyTaskToClientId.get(activeTaskId);
final int numberOfRemainingStandbys = pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(
pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(
numStandbyReplicas,
clients,
pendingStandbyTaskToNumberRemainingStandbys,
standbyTaskClientsByTaskLoad,
activeTaskId
activeTaskId,
log
);
if (numberOfRemainingStandbys > 0) {
log.warn("Unable to assign {} of {} standby tasks for task [{}] with client tags [{}]. " +
"There is not enough available capacity. You should " +
"increase the number of application instances " +
"on different client tag dimensions " +
"to maintain the requested number of standby replicas. " +
"Rack awareness is configured with [{}] tags.",
numberOfRemainingStandbys, numStandbyReplicas, activeTaskId,
clients.get(clientId).clientTags(), rackAwareAssignmentTags);
}
}
}
@ -174,7 +156,8 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
}
// Visible for testing
static void assignStandbyTasksToClientsWithDifferentTags(final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
static void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStandbyClients,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId,
final UUID activeTaskClient,
final Set<String> rackAwareAssignmentTags,
@ -211,17 +194,32 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
break;
}
clientStates.get(clientOnUnusedTagDimensions).assignStandby(activeTaskId);
final ClientState clientStateOnUsedTagDimensions = clientStates.get(clientOnUnusedTagDimensions);
countOfUsedClients++;
numRemainingStandbys--;
log.debug("Assigning {} out of {} standby tasks for an active task [{}] with client tags {}. " +
"Standby task client tags are {}.",
numberOfStandbyClients - numRemainingStandbys, numberOfStandbyClients, activeTaskId,
clientStates.get(activeTaskClient).clientTags(), clientStateOnUsedTagDimensions.clientTags());
clientStateOnUsedTagDimensions.assignStandby(activeTaskId);
lastUsedClient = clientOnUnusedTagDimensions;
} while (numRemainingStandbys > 0);
if (numRemainingStandbys > 0) {
pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient);
tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
log.warn("Rack aware standby task assignment was not able to assign {} of {} standby tasks for the " +
"active task [{}] with the rack aware assignment tags {}. " +
"This may happen when there aren't enough application instances on different tag " +
"dimensions compared to an active and corresponding standby task. " +
"Consider launching application instances on different tag dimensions than [{}]. " +
"Standby task assignment will fall back to assigning standby tasks to the least loaded clients.",
numRemainingStandbys, numberOfStandbyClients,
activeTaskId, rackAwareAssignmentTags,
clientStates.get(activeTaskClient).clientTags());
} else {
tasksToRemainingStandbys.remove(activeTaskId);
}

View File

@ -50,18 +50,12 @@ class DefaultStandbyTaskAssignor implements StandbyTaskAssignor {
standbyTaskClientsByTaskLoad.offerAll(clients.keySet());
for (final TaskId task : statefulTaskIds) {
final int numRemainingStandbys = pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(clients,
tasksToRemainingStandbys,
standbyTaskClientsByTaskLoad,
task);
if (numRemainingStandbys > 0) {
log.warn("Unable to assign {} of {} standby tasks for task [{}]. " +
"There is not enough available capacity. You should " +
"increase the number of application instances " +
"to maintain the requested number of standby replicas.",
numRemainingStandbys, numStandbyReplicas, task);
}
pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(numStandbyReplicas,
clients,
tasksToRemainingStandbys,
standbyTaskClientsByTaskLoad,
task,
log);
}
// returning false, because standby task assignment will never require a follow-up probing rebalance.

View File

@ -129,7 +129,7 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
return;
}
final StandbyTaskAssignor standbyTaskAssignor = createStandbyTaskAssignor(configs);
final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(configs);
standbyTaskAssignor.assign(clientStates, allTaskIds, statefulTasks, configs);
@ -142,15 +142,6 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
);
}
// Visible for testing
static StandbyTaskAssignor createStandbyTaskAssignor(final AssignmentConfigs configs) {
if (!configs.rackAwareAssignmentTags.isEmpty()) {
return new ClientTagAwareStandbyTaskAssignor();
} else {
return new DefaultStandbyTaskAssignor();
}
}
private static void balanceTasksOverThreads(final SortedMap<UUID, ClientState> clientStates,
final Function<ClientState, Set<TaskId>> currentAssignmentAccessor,
final BiConsumer<ClientState, TaskId> taskUnassignor,

View File

@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.TaskManager;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -37,4 +38,5 @@ public class ReferenceContainer {
public final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
public final Queue<StreamsException> nonFatalExceptionsToHandle = new LinkedList<>();
public Time time;
public Map<String, String> clientTags;
}

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
import java.util.Map;
import java.util.Set;
@ -33,10 +34,12 @@ final class StandbyTaskAssignmentUtils {
client -> clients.get(client).assignedTaskLoad());
}
static int pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(final Map<UUID, ClientState> clients,
final Map<TaskId, Integer> tasksToRemainingStandbys,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId) {
static void pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(final int numStandbyReplicas,
final Map<UUID, ClientState> clients,
final Map<TaskId, Integer> tasksToRemainingStandbys,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId,
final Logger log) {
int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
while (numRemainingStandbys > 0) {
final UUID client = standbyTaskClientsByTaskLoad.poll(activeTaskId);
@ -49,7 +52,13 @@ final class StandbyTaskAssignmentUtils {
tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
}
return numRemainingStandbys;
if (numRemainingStandbys > 0) {
log.warn("Unable to assign {} of {} standby tasks for task [{}]. " +
"There is not enough available capacity. You should " +
"increase the number of application instances " +
"to maintain the requested number of standby replicas.",
numRemainingStandbys, numStandbyReplicas, activeTaskId);
}
}
static Map<TaskId, Integer> computeTasksToRemainingStandbys(final int numStandbyReplicas,

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
class StandbyTaskAssignorFactory {
private StandbyTaskAssignorFactory() {}
static StandbyTaskAssignor create(final AssignorConfiguration.AssignmentConfigs configs) {
if (!configs.rackAwareAssignmentTags.isEmpty()) {
return new ClientTagAwareStandbyTaskAssignor();
} else {
return new DefaultStandbyTaskAssignor();
}
}
}

View File

@ -38,19 +38,25 @@ import org.junit.Test;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import static java.util.Collections.nCopies;
import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED;
import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA;
import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH;
import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH;
import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix;
@ -1124,6 +1130,105 @@ public class StreamsConfigTest {
assertThrows(ConfigException.class, () -> new StreamsConfig(props));
}
@Test
public void shouldDefaultToEmptyListIfRackAwareAssignmentTagsIsNotSet() {
final StreamsConfig config = new StreamsConfig(props);
assertTrue(config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG).isEmpty());
}
@Test
public void shouldThrowExceptionWhenClientTagsExceedTheLimit() {
final int limit = StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + 1;
for (int i = 0; i < limit; i++) {
props.put(StreamsConfig.clientTagPrefix("k" + i), "v" + i);
}
final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertEquals(
String.format("At most %s client tags can be specified using %s prefix.",
StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
StreamsConfig.CLIENT_TAG_PREFIX
), exception.getMessage()
);
}
@Test
public void shouldThrowExceptionWhenRackAwareAssignmentTagsExceedsMaxListSize() {
final int limit = StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + 1;
final List<String> rackAwareAssignmentTags = new ArrayList<>();
for (int i = 0; i < limit; i++) {
final String clientTagKey = "k" + i;
rackAwareAssignmentTags.add(clientTagKey);
props.put(StreamsConfig.clientTagPrefix(clientTagKey), "v" + i);
}
props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, String.join(",", rackAwareAssignmentTags));
final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertEquals(
String.format("Invalid value %s for configuration %s: exceeds maximum list size of [%s].",
rackAwareAssignmentTags,
StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE),
exception.getMessage()
);
}
@Test
public void shouldSetRackAwareAssignmentTags() {
props.put(StreamsConfig.clientTagPrefix("cluster"), "cluster-1");
props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a");
props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, "cluster,zone");
final StreamsConfig config = new StreamsConfig(props);
assertEquals(new HashSet<>(config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG)),
mkSet("cluster", "zone"));
}
@Test
public void shouldGetEmptyMapIfClientTagsAreNotSet() {
final StreamsConfig config = new StreamsConfig(props);
assertTrue(config.getClientTags().isEmpty());
}
@Test
public void shouldGetClientTagsMapWhenSet() {
props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a");
props.put(StreamsConfig.clientTagPrefix("cluster"), "cluster-1");
final StreamsConfig config = new StreamsConfig(props);
final Map<String, String> clientTags = config.getClientTags();
assertEquals(clientTags.size(), 2);
assertEquals(clientTags.get("zone"), "eu-central-1a");
assertEquals(clientTags.get("cluster"), "cluster-1");
}
@Test
public void shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithUnknownTags() {
props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, "cluster");
assertThrows(ConfigException.class, () -> new StreamsConfig(props));
}
@Test
public void shouldThrowExceptionWhenClientTagKeyExceedMaxLimit() {
final String key = String.join("", nCopies(MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH + 1, "k"));
props.put(StreamsConfig.clientTagPrefix(key), "eu-central-1a");
final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertEquals(
String.format("Invalid value %s for configuration %s: Tag key exceeds maximum length of %s.",
key, StreamsConfig.CLIENT_TAG_PREFIX, StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH),
exception.getMessage()
);
}
@Test
public void shouldThrowExceptionWhenClientTagValueExceedMaxLimit() {
final String value = String.join("", nCopies(MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH + 1, "v"));
props.put(StreamsConfig.clientTagPrefix("x"), value);
final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props));
assertEquals(
String.format("Invalid value %s for configuration %s: Tag value exceeds maximum length of %s.",
value, StreamsConfig.CLIENT_TAG_PREFIX, StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH),
exception.getMessage()
);
}
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {

View File

@ -0,0 +1,436 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category({IntegrationTest.class})
public class RackAwarenessIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
private static final String TAG_VALUE_K8_CLUSTER_1 = "k8s-cluster-1";
private static final String TAG_VALUE_K8_CLUSTER_2 = "k8s-cluster-2";
private static final String TAG_VALUE_K8_CLUSTER_3 = "k8s-cluster-3";
private static final String TAG_VALUE_EU_CENTRAL_1A = "eu-central-1a";
private static final String TAG_VALUE_EU_CENTRAL_1B = "eu-central-1b";
private static final String TAG_VALUE_EU_CENTRAL_1C = "eu-central-1c";
private static final int DEFAULT_NUMBER_OF_STATEFUL_SUB_TOPOLOGIES = 1;
private static final int DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES = 2;
@Rule
public TestName testName = new TestName();
private static final String INPUT_TOPIC = "input-topic";
private static final String TAG_ZONE = "zone";
private static final String TAG_CLUSTER = "cluster";
private List<KafkaStreamsWithConfiguration> kafkaStreamsInstances;
private Properties baseConfiguration;
private Topology topology;
@BeforeClass
public static void createTopics() throws Exception {
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 6, 1);
}
@Before
public void setup() {
kafkaStreamsInstances = new ArrayList<>();
baseConfiguration = new Properties();
final String safeTestName = safeUniqueTestName(getClass(), testName);
final String applicationId = "app-" + safeTestName;
baseConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
baseConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
baseConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
baseConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
baseConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
}
@After
public void cleanup() throws IOException {
for (final KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : kafkaStreamsInstances) {
kafkaStreamsWithConfiguration.kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
IntegrationTestUtils.purgeLocalStreamsState(kafkaStreamsWithConfiguration.configuration);
}
kafkaStreamsInstances.clear();
}
@Test
public void shouldDoRebalancingWithMaximumNumberOfClientTags() throws Exception {
initTopology(3, 3);
final int numberOfStandbyReplicas = 1;
final List<String> clientTagKeys = new ArrayList<>();
final Map<String, String> clientTags1 = new HashMap<>();
final Map<String, String> clientTags2 = new HashMap<>();
for (int i = 0; i < StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE; i++) {
clientTagKeys.add("key-" + i);
}
for (int i = 0; i < clientTagKeys.size(); i++) {
final String key = clientTagKeys.get(i);
clientTags1.put(key, "value-1-" + i);
clientTags2.put(key, "value-2-" + i);
}
assertEquals(StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE, clientTagKeys.size());
Stream.of(clientTags1, clientTags2)
.forEach(clientTags -> assertEquals(String.format("clientsTags with content '%s' " +
"did not match expected size", clientTags),
StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
clientTags.size()));
createAndStart(clientTags1, clientTagKeys, numberOfStandbyReplicas);
createAndStart(clientTags1, clientTagKeys, numberOfStandbyReplicas);
createAndStart(clientTags2, clientTagKeys, numberOfStandbyReplicas);
waitUntilAllKafkaStreamsClientsAreRunning();
assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys));
stopKafkaStreamsInstanceWithIndex(0);
waitUntilAllKafkaStreamsClientsAreRunning();
assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys));
}
@Test
public void shouldDistributeStandbyReplicasWhenAllClientsAreLocatedOnASameClusterTag() throws Exception {
initTopology();
final int numberOfStandbyReplicas = 1;
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
waitUntilAllKafkaStreamsClientsAreRunning();
assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)));
}
@Test
public void shouldDistributeStandbyReplicasOverMultipleClientTags() throws Exception {
initTopology();
final int numberOfStandbyReplicas = 2;
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
waitUntilAllKafkaStreamsClientsAreRunning();
assertTrue(isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER)));
}
@Test
public void shouldDistributeStandbyReplicasWhenIdealDistributionCanNotBeAchieved() throws Exception {
initTopology();
final int numberOfStandbyReplicas = 2;
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas);
waitUntilAllKafkaStreamsClientsAreRunning();
assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)));
assertTrue(isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER)));
}
private void stopKafkaStreamsInstanceWithIndex(final int index) {
kafkaStreamsInstances.get(index).kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
kafkaStreamsInstances.remove(index);
}
private void waitUntilAllKafkaStreamsClientsAreRunning() throws Exception {
waitUntilAllKafkaStreamsClientsAreRunning(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
}
private void waitUntilAllKafkaStreamsClientsAreRunning(final Duration timeout) throws Exception {
IntegrationTestUtils.waitForApplicationState(kafkaStreamsInstances.stream().map(it -> it.kafkaStreams).collect(Collectors.toList()),
KafkaStreams.State.RUNNING,
timeout);
}
private boolean isPartialTaskDistributionReachedForTags(final Collection<String> tagsToCheck) {
final Predicate<TaskClientTagDistribution> partialTaskClientTagDistributionTest = taskClientTagDistribution -> {
final Map<String, String> activeTaskClientTags = taskClientTagDistribution.activeTaskClientTags.clientTags;
return tagsAmongstActiveAndAtLeastOneStandbyTaskIsDifferent(taskClientTagDistribution.standbyTasksClientTags, activeTaskClientTags, tagsToCheck);
};
return isTaskDistributionTestSuccessful(partialTaskClientTagDistributionTest);
}
private boolean isIdealTaskDistributionReachedForTags(final Collection<String> tagsToCheck) {
final Predicate<TaskClientTagDistribution> idealTaskClientTagDistributionTest = taskClientTagDistribution -> {
final Map<String, String> activeTaskClientTags = taskClientTagDistribution.activeTaskClientTags.clientTags;
return tagsAmongstStandbyTasksAreDifferent(taskClientTagDistribution.standbyTasksClientTags, tagsToCheck)
&& tagsAmongstActiveAndAllStandbyTasksAreDifferent(taskClientTagDistribution.standbyTasksClientTags,
activeTaskClientTags,
tagsToCheck);
};
return isTaskDistributionTestSuccessful(idealTaskClientTagDistributionTest);
}
private boolean isTaskDistributionTestSuccessful(final Predicate<TaskClientTagDistribution> taskClientTagDistributionPredicate) {
final List<TaskClientTagDistribution> tasksClientTagDistributions = getTasksClientTagDistributions();
if (tasksClientTagDistributions.isEmpty()) {
return false;
}
return tasksClientTagDistributions.stream().allMatch(taskClientTagDistributionPredicate);
}
private static boolean tagsAmongstActiveAndAllStandbyTasksAreDifferent(final Collection<TaskClientTags> standbyTasks,
final Map<String, String> activeTaskClientTags,
final Collection<String> tagsToCheck) {
return standbyTasks.stream().allMatch(standbyTask -> tagsToCheck.stream().noneMatch(tag -> activeTaskClientTags.get(tag).equals(standbyTask.clientTags.get(tag))));
}
private static boolean tagsAmongstActiveAndAtLeastOneStandbyTaskIsDifferent(final Collection<TaskClientTags> standbyTasks,
final Map<String, String> activeTaskClientTags,
final Collection<String> tagsToCheck) {
return standbyTasks.stream().anyMatch(standbyTask -> tagsToCheck.stream().noneMatch(tag -> activeTaskClientTags.get(tag).equals(standbyTask.clientTags.get(tag))));
}
private static boolean tagsAmongstStandbyTasksAreDifferent(final Collection<TaskClientTags> standbyTasks, final Collection<String> tagsToCheck) {
final Map<String, Integer> statistics = new HashMap<>();
for (final TaskClientTags standbyTask : standbyTasks) {
for (final String tag : tagsToCheck) {
final String tagValue = standbyTask.clientTags.get(tag);
final Integer tagValueOccurrence = statistics.getOrDefault(tagValue, 0);
statistics.put(tagValue, tagValueOccurrence + 1);
}
}
return statistics.values().stream().noneMatch(occurrence -> occurrence > 1);
}
private void initTopology() {
initTopology(DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES, DEFAULT_NUMBER_OF_STATEFUL_SUB_TOPOLOGIES);
}
private void initTopology(final int numberOfPartitionsOfSubTopologies, final int numberOfStatefulSubTopologies) {
final StreamsBuilder builder = new StreamsBuilder();
final String stateStoreName = "myTransformState";
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(stateStoreName),
Serdes.Integer(),
Serdes.Integer()
);
builder.addStateStore(keyValueStoreBuilder);
final KStream<Integer, Integer> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer()));
// Stateless sub-topology
stream.repartition(Repartitioned.numberOfPartitions(numberOfPartitionsOfSubTopologies)).filter((k, v) -> true);
// Stateful sub-topologies
for (int i = 0; i < numberOfStatefulSubTopologies; i++) {
stream.repartition(Repartitioned.numberOfPartitions(numberOfPartitionsOfSubTopologies))
.groupByKey()
.reduce(Integer::sum);
}
topology = builder.build();
}
private List<TaskClientTagDistribution> getTasksClientTagDistributions() {
final List<TaskClientTagDistribution> taskClientTags = new ArrayList<>();
for (final KafkaStreamsWithConfiguration kafkaStreamsInstance : kafkaStreamsInstances) {
final StreamsConfig config = new StreamsConfig(kafkaStreamsInstance.configuration);
for (final ThreadMetadata localThreadsMetadata : kafkaStreamsInstance.kafkaStreams.metadataForLocalThreads()) {
localThreadsMetadata.activeTasks().forEach(activeTask -> {
final TaskId activeTaskId = activeTask.taskId();
final Map<String, String> clientTags = config.getClientTags();
final List<TaskClientTags> standbyTasks = findStandbysForActiveTask(activeTaskId);
if (!standbyTasks.isEmpty()) {
final TaskClientTags activeTaskView = new TaskClientTags(activeTaskId, clientTags);
taskClientTags.add(new TaskClientTagDistribution(activeTaskView, standbyTasks));
}
});
}
}
return taskClientTags;
}
private List<TaskClientTags> findStandbysForActiveTask(final TaskId taskId) {
final List<TaskClientTags> standbyTasks = new ArrayList<>();
for (final KafkaStreamsWithConfiguration kafkaStreamsInstance : kafkaStreamsInstances) {
for (final ThreadMetadata localThreadsMetadata : kafkaStreamsInstance.kafkaStreams.metadataForLocalThreads()) {
localThreadsMetadata.standbyTasks().forEach(standbyTask -> {
final TaskId standbyTaskId = standbyTask.taskId();
if (taskId.equals(standbyTaskId)) {
final StreamsConfig config = new StreamsConfig(kafkaStreamsInstance.configuration);
standbyTasks.add(new TaskClientTags(standbyTaskId, config.getClientTags()));
}
});
}
}
return standbyTasks;
}
private static Map<String, String> buildClientTags(final String zone, final String cluster) {
final Map<String, String> clientTags = new HashMap<>();
clientTags.put(TAG_ZONE, zone);
clientTags.put(TAG_CLUSTER, cluster);
return clientTags;
}
private void createAndStart(final Map<String, String> clientTags,
final Collection<String> rackAwareAssignmentTags,
final int numberOfStandbyReplicas) {
final Properties streamsConfiguration = createStreamsConfiguration(clientTags, rackAwareAssignmentTags, numberOfStandbyReplicas);
final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
kafkaStreamsInstances.add(new KafkaStreamsWithConfiguration(streamsConfiguration, kafkaStreams));
kafkaStreams.start();
}
private Properties createStreamsConfiguration(final Map<String, String> clientTags,
final Collection<String> rackAwareAssignmentTags,
final int numStandbyReplicas) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.putAll(baseConfiguration);
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
streamsConfiguration.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, String.join(",", rackAwareAssignmentTags));
clientTags.forEach((key, value) -> streamsConfiguration.put(StreamsConfig.clientTagPrefix(key), value));
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(String.join("-", clientTags.values())).getPath());
return streamsConfiguration;
}
private static final class KafkaStreamsWithConfiguration {
private final Properties configuration;
private final KafkaStreams kafkaStreams;
KafkaStreamsWithConfiguration(final Properties configuration, final KafkaStreams kafkaStreams) {
this.configuration = configuration;
this.kafkaStreams = kafkaStreams;
}
}
private static final class TaskClientTagDistribution {
private final TaskClientTags activeTaskClientTags;
private final List<TaskClientTags> standbyTasksClientTags;
TaskClientTagDistribution(final TaskClientTags activeTaskClientTags, final List<TaskClientTags> standbyTasksClientTags) {
this.activeTaskClientTags = activeTaskClientTags;
this.standbyTasksClientTags = standbyTasksClientTags;
}
@Override
public String toString() {
return "TaskDistribution{" +
"activeTaskClientTagsView=" + activeTaskClientTags +
", standbyTasks=" + standbyTasksClientTags +
'}';
}
}
private static final class TaskClientTags {
private final TaskId taskId;
private final Map<String, String> clientTags;
TaskClientTags(final TaskId taskId, final Map<String, String> clientTags) {
this.taskId = taskId;
this.clientTags = clientTags;
}
@Override
public String toString() {
return "TaskClientTags{" +
"taskId=" + taskId +
", clientTags=" + clientTags +
'}';
}
}
}

View File

@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
@ -79,6 +77,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -86,6 +85,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import java.util.UUID;
@ -196,6 +196,7 @@ public class StreamsPartitionAssignorTest {
private StreamsMetadataState streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class);
private final Map<String, Subscription> subscriptions = new HashMap<>();
private final Class<? extends TaskAssignor> taskAssignor;
private Map<String, String> clientTags;
private final ReferenceContainer referenceContainer = new ReferenceContainer();
private final MockTime time = new MockTime();
@ -210,6 +211,7 @@ public class StreamsPartitionAssignorTest {
referenceContainer.taskManager = taskManager;
referenceContainer.streamsMetadataState = streamsMetadataState;
referenceContainer.time = time;
referenceContainer.clientTags = clientTags != null ? clientTags : EMPTY_CLIENT_TAGS;
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
return configurationMap;
@ -2190,6 +2192,21 @@ public class StreamsPartitionAssignorTest {
equalTo(AssignorError.ASSIGNMENT_ERROR.code()));
}
@Test
public void testClientTags() {
clientTags = mkMap(mkEntry("cluster", "cluster1"), mkEntry("zone", "az1"));
createDefaultMockTaskManager();
configureDefaultPartitionAssignor();
final Set<String> topics = mkSet("input");
final Subscription subscription = new Subscription(new ArrayList<>(topics),
partitionAssignor.subscriptionUserData(topics));
final SubscriptionInfo info = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, uniqueField, clientTags);
assertEquals(singletonList("input"), subscription.topics());
assertEquals(info, SubscriptionInfo.decode(subscription.userData()));
assertEquals(clientTags, partitionAssignor.clientTags());
}
private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder {
private Map<Subtopology, TopicsInfo> corruptedTopicGroups;

View File

@ -166,6 +166,15 @@ public final class AssignmentTestUtils {
LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, EMPTY_CLIENT_TAGS);
}
public static SubscriptionInfo getInfo(final UUID processId,
final Set<TaskId> prevTasks,
final Set<TaskId> standbyTasks,
final byte uniqueField,
final Map<String, String> clientTags) {
return new SubscriptionInfo(
LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, clientTags);
}
// Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets
private static Map<TaskId, Long> getTaskOffsetSums(final Collection<TaskId> activeTasks, final Collection<TaskId> standbyTasks) {
final Map<TaskId, Long> taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> Task.LATEST_OFFSET));

View File

@ -81,8 +81,43 @@ public class ClientTagAwareStandbyTaskAssignorTest {
standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor();
}
@Test
public void shouldNotAssignStatelessTasksToAnyClients() {
final Set<TaskId> statefulTasks = mkSet(
TASK_1_0,
TASK_1_1,
TASK_1_2
);
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_0, TASK_1_0)),
mkEntry(UUID_2, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_1)))),
mkEntry(UUID_3, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_1)))),
mkEntry(UUID_4, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_2)), TASK_0_1, TASK_1_1)),
mkEntry(UUID_5, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_2)))),
mkEntry(UUID_6, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_2)))),
mkEntry(UUID_7, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_3)), TASK_0_2, TASK_1_2)),
mkEntry(UUID_8, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_3)))),
mkEntry(UUID_9, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_3))))
);
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2, ZONE_TAG, CLUSTER_TAG);
standbyTaskAssignor.assign(clientStates, allActiveTasks, statefulTasks, assignmentConfigs);
final Set<TaskId> statelessTasks = allActiveTasks.stream().filter(taskId -> !statefulTasks.contains(taskId)).collect(Collectors.toSet());
assertTrue(
clientStates.values().stream().allMatch(clientState -> statelessTasks.stream().noneMatch(clientState::hasStandbyTask))
);
}
@Test
public void shouldRemoveClientToRemainingStandbysAndNotPopulatePendingStandbyTasksToClientIdWhenAllStandbyTasksWereAssigned() {
final int numStandbyReplicas = 2;
final Set<String> rackAwareAssignmentTags = mkSet(ZONE_TAG, CLUSTER_TAG);
final Map<UUID, ClientState> clientStates = mkMap(
mkEntry(UUID_1, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_0)),
@ -102,10 +137,11 @@ public class ClientTagAwareStandbyTaskAssignorTest {
fillClientsTagStatistics(clientStates, tagEntryToClients, tagKeyToValues);
final Map<TaskId, UUID> pendingStandbyTasksToClientId = new HashMap<>();
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(2, allActiveTasks);
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, allActiveTasks);
for (final TaskId activeTaskId : allActiveTasks) {
assignStandbyTasksToClientsWithDifferentTags(
numStandbyReplicas,
constrainedPrioritySet,
activeTaskId,
taskToClientId.get(activeTaskId),
@ -132,6 +168,7 @@ public class ClientTagAwareStandbyTaskAssignorTest {
);
final ConstrainedPrioritySet constrainedPrioritySet = createLeastLoadedPrioritySetConstrainedByAssignedTask(clientStates);
final int numStandbyReplicas = 3;
final Set<TaskId> allActiveTasks = findAllActiveTasks(clientStates);
final Map<TaskId, UUID> taskToClientId = mkMap(mkEntry(TASK_0_0, UUID_1),
mkEntry(TASK_0_1, UUID_2),
@ -143,10 +180,11 @@ public class ClientTagAwareStandbyTaskAssignorTest {
fillClientsTagStatistics(clientStates, tagEntryToClients, tagKeyToValues);
final Map<TaskId, UUID> pendingStandbyTasksToClientId = new HashMap<>();
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(3, allActiveTasks);
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, allActiveTasks);
for (final TaskId activeTaskId : allActiveTasks) {
assignStandbyTasksToClientsWithDifferentTags(
numStandbyReplicas,
constrainedPrioritySet,
activeTaskId,
taskToClientId.get(activeTaskId),

View File

@ -20,10 +20,8 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@ -31,7 +29,6 @@ import java.util.stream.Collectors;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@ -68,7 +65,6 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class HighAvailabilityTaskAssignorTest {
@ -814,27 +810,6 @@ public class HighAvailabilityTaskAssignorTest {
assertThat(probingRebalanceNeeded, is(false));
}
@Test
public void shouldReturnClientTagAwareStandbyTaskAssignorWhenRackAwareAssignmentTagsIsSet() {
final StandbyTaskAssignor standbyTaskAssignor = HighAvailabilityTaskAssignor.createStandbyTaskAssignor(newAssignmentConfigs(1, singletonList("az")));
assertTrue(standbyTaskAssignor instanceof ClientTagAwareStandbyTaskAssignor);
}
@Test
public void shouldReturnDefaultStandbyTaskAssignorWhenRackAwareAssignmentTagsIsEmpty() {
final StandbyTaskAssignor standbyTaskAssignor = HighAvailabilityTaskAssignor.createStandbyTaskAssignor(newAssignmentConfigs(1, Collections.emptyList()));
assertTrue(standbyTaskAssignor instanceof DefaultStandbyTaskAssignor);
}
private static AssignorConfiguration.AssignmentConfigs newAssignmentConfigs(final int numStandbyReplicas,
final List<String> rackAwareAssignmentTags) {
return new AssignorConfiguration.AssignmentConfigs(0L,
1,
numStandbyReplicas,
60000L,
rackAwareAssignmentTags);
}
private static void assertHasNoActiveTasks(final ClientState... clients) {
for (final ClientState client : clients) {
assertThat(client.activeTasks(), is(empty()));

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import java.util.Map;
import java.util.Set;
@ -36,8 +37,16 @@ import static org.apache.kafka.streams.processor.internals.assignment.StandbyTas
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
public class StandbyTaskAssignmentUtilsTest {
private static final Set<TaskId> ACTIVE_TASKS = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
private Map<UUID, ClientState> clients;
@ -45,6 +54,7 @@ public class StandbyTaskAssignmentUtilsTest {
@Before
public void setup() {
clients = getClientStatesMap(ACTIVE_TASKS.stream().map(StandbyTaskAssignmentUtilsTest::mkState).toArray(ClientState[]::new));
clientsByTaskLoad = new ConstrainedPrioritySet(
(client, task) -> !clients.get(client).hasAssignedTask(task),
@ -55,38 +65,38 @@ public class StandbyTaskAssignmentUtilsTest {
@Test
public void shouldReturnNumberOfStandbyTasksThatWereNotAssigned() {
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(3, ACTIVE_TASKS);
final Logger logMock = mock(Logger.class);
final int numStandbyReplicas = 3;
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, ACTIVE_TASKS);
assertTrue(tasksToRemainingStandbys.keySet()
.stream()
.map(taskId -> pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(
clients,
tasksToRemainingStandbys,
clientsByTaskLoad,
taskId
))
.allMatch(numRemainingStandbys -> numRemainingStandbys == 1));
tasksToRemainingStandbys.keySet().forEach(taskId -> pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(numStandbyReplicas,
clients,
tasksToRemainingStandbys,
clientsByTaskLoad,
taskId,
logMock));
assertTrue(ACTIVE_TASKS.stream().allMatch(activeTask -> tasksToRemainingStandbys.get(activeTask) == 1));
assertTrue(areStandbyTasksPresentForAllActiveTasks(2));
verify(logMock, times(ACTIVE_TASKS.size())).warn(anyString(), anyInt(), anyInt(), any());
}
@Test
public void shouldReturnZeroWhenAllStandbyTasksWereSuccessfullyAssigned() {
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(1, ACTIVE_TASKS);
final Logger logMock = mock(Logger.class);
final int numStandbyReplicas = 1;
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, ACTIVE_TASKS);
assertTrue(tasksToRemainingStandbys.keySet()
.stream()
.map(taskId -> pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(
clients,
tasksToRemainingStandbys,
clientsByTaskLoad,
taskId
))
.allMatch(numRemainingStandbys -> numRemainingStandbys == 0));
tasksToRemainingStandbys.keySet().forEach(taskId -> pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(numStandbyReplicas,
clients,
tasksToRemainingStandbys,
clientsByTaskLoad,
taskId,
logMock));
assertTrue(ACTIVE_TASKS.stream().allMatch(activeTask -> tasksToRemainingStandbys.get(activeTask) == 0));
assertTrue(areStandbyTasksPresentForAllActiveTasks(1));
verifyNoInteractions(logMock);
}
@Test

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertTrue;
public class StandbyTaskAssignorFactoryTest {
private static final long ACCEPTABLE_RECOVERY_LAG = 0L;
private static final int MAX_WARMUP_REPLICAS = 1;
private static final int NUMBER_OF_STANDBY_REPLICAS = 1;
private static final long PROBING_REBALANCE_INTERVAL_MS = 60000L;
@Test
public void shouldReturnClientTagAwareStandbyTaskAssignorWhenRackAwareAssignmentTagsIsSet() {
final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(newAssignmentConfigs(singletonList("az")));
assertTrue(standbyTaskAssignor instanceof ClientTagAwareStandbyTaskAssignor);
}
@Test
public void shouldReturnDefaultStandbyTaskAssignorWhenRackAwareAssignmentTagsIsEmpty() {
final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(newAssignmentConfigs(Collections.emptyList()));
assertTrue(standbyTaskAssignor instanceof DefaultStandbyTaskAssignor);
}
private static AssignorConfiguration.AssignmentConfigs newAssignmentConfigs(final List<String> rackAwareAssignmentTags) {
return new AssignorConfiguration.AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG,
MAX_WARMUP_REPLICAS,
NUMBER_OF_STANDBY_REPLICAS,
PROBING_REBALANCE_INTERVAL_MS,
rackAwareAssignmentTags);
}
}