diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 85b0103e59a..8c91a2523e5 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -1121,6 +1121,27 @@ public class ConfigDef { } } + public static class ListSize implements Validator { + final int maxSize; + + private ListSize(final int maxSize) { + this.maxSize = maxSize; + } + + public static ListSize atMostOfSize(final int maxSize) { + return new ListSize(maxSize); + } + + @Override + public void ensureValid(final String name, final Object value) { + @SuppressWarnings("unchecked") + List values = (List) value; + if (values.size() > maxSize) { + throw new ConfigException(name, value, "exceeds maximum list size of [" + maxSize + "]."); + } + } + } + public static class ConfigKey { public final String name; public final Type type; diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 893f68b89e6..0e5af1f5cb3 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.common.config; import org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString; import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.ListSize; import org.apache.kafka.common.config.ConfigDef.Range; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.ValidString; @@ -38,6 +39,8 @@ import java.util.Properties; import java.util.Set; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -426,7 +429,7 @@ public class ConfigDefTest { public void testMissingDependentConfigs() { // Should not be possible to parse a config if a dependent config has not been defined final ConfigDef configDef = new ConfigDef() - .define("parent", Type.STRING, Importance.HIGH, "parent docs", "group", 1, Width.LONG, "Parent", Collections.singletonList("child")); + .define("parent", Type.STRING, Importance.HIGH, "parent docs", "group", 1, Width.LONG, "Parent", singletonList("child")); assertThrows(ConfigException.class, () -> configDef.parse(Collections.emptyMap())); } @@ -438,7 +441,7 @@ public class ConfigDefTest { assertEquals(new HashSet<>(Arrays.asList("a")), baseConfigDef.getConfigsWithNoParent()); final ConfigDef configDef = new ConfigDef(baseConfigDef) - .define("parent", Type.STRING, Importance.HIGH, "parent docs", "group", 1, Width.LONG, "Parent", Collections.singletonList("child")) + .define("parent", Type.STRING, Importance.HIGH, "parent docs", "group", 1, Width.LONG, "Parent", singletonList("child")) .define("child", Type.STRING, Importance.HIGH, "docs"); assertEquals(new HashSet<>(Arrays.asList("a", "parent")), configDef.getConfigsWithNoParent()); @@ -541,7 +544,7 @@ public class ConfigDefTest { .define("opt2.of.group2", Type.BOOLEAN, false, Importance.HIGH, "Doc doc doc doc.", "Group Two", 1, Width.NONE, "..", Collections.emptyList()) .define("opt1.of.group2", Type.BOOLEAN, false, Importance.HIGH, "Doc doc doc doc doc.", - "Group Two", 0, Width.NONE, "..", Collections.singletonList("some.option")) + "Group Two", 0, Width.NONE, "..", singletonList("some.option")) .define("poor.opt", Type.STRING, "foo", Importance.HIGH, "Doc doc doc doc."); final String expectedRst = "" + @@ -722,4 +725,37 @@ public class ConfigDefTest { assertEquals(" (365 days)", ConfigDef.niceTimeUnits(Duration.ofDays(365).toMillis())); } + @Test + public void testThrowsExceptionWhenListSizeExceedsLimit() { + final ConfigException exception = assertThrows(ConfigException.class, () -> new ConfigDef().define("lst", + Type.LIST, + asList("a", "b"), + ListSize.atMostOfSize(1), + Importance.HIGH, + "lst doc")); + assertEquals("Invalid value [a, b] for configuration lst: exceeds maximum list size of [1].", + exception.getMessage()); + } + + @Test + public void testNoExceptionIsThrownWhenListSizeEqualsTheLimit() { + final List lst = asList("a", "b", "c"); + assertDoesNotThrow(() -> new ConfigDef().define("lst", + Type.LIST, + lst, + ListSize.atMostOfSize(lst.size()), + Importance.HIGH, + "lst doc")); + } + + @Test + public void testNoExceptionIsThrownWhenListSizeIsBelowTheLimit() { + assertDoesNotThrow(() -> new ConfigDef().define("lst", + Type.LIST, + asList("a", "b"), + ListSize.atMostOfSize(3), + Importance.HIGH, + "lst doc")); + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 672df55c99e..66cee6aed7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -48,12 +48,16 @@ import java.io.File; import java.time.Duration; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED; +import static org.apache.kafka.common.config.ConfigDef.ListSize.atMostOfSize; import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; @@ -148,6 +152,12 @@ public class StreamsConfig extends AbstractConfig { public static final int DUMMY_THREAD_INDEX = 1; public static final long MAX_TASK_IDLE_MS_DISABLED = -1; + // We impose these limitations because client tags are encoded into the subscription info, + // which is part of the group metadata message that is persisted into the internal topic. + public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE = 5; + public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH = 20; + public static final int MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH = 30; + /** * Prefix used to provide default topic configs to be applied when creating internal topics. * These should be valid properties from {@link org.apache.kafka.common.config.TopicConfig TopicConfig}. @@ -212,6 +222,15 @@ public class StreamsConfig extends AbstractConfig { @SuppressWarnings("WeakerAccess") public static final String ADMIN_CLIENT_PREFIX = "admin."; + /** + * Prefix used to add arbitrary tags to a Kafka Stream's instance as key-value pairs. + * Example: + * client.tag.zone=zone1 + * client.tag.cluster=cluster1 + */ + @SuppressWarnings("WeakerAccess") + public static final String CLIENT_TAG_PREFIX = "client.tag."; + /** * Config value for parameter {@link #TOPOLOGY_OPTIMIZATION_CONFIG "topology.optimization"} for disabling topology optimization */ @@ -511,6 +530,13 @@ public class StreamsConfig extends AbstractConfig { @SuppressWarnings("WeakerAccess") public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG; + /** {@code rack.aware.assignment.tags} */ + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_TAGS_CONFIG = "rack.aware.assignment.tags"; + private static final String RACK_AWARE_ASSIGNMENT_TAGS_DOC = "List of client tag keys used to distribute standby replicas across Kafka Streams instances." + + " When configured, Kafka Streams will make a best-effort to distribute" + + " the standby tasks over each client tag dimension."; + /** {@code reconnect.backoff.ms} */ @SuppressWarnings("WeakerAccess") public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG; @@ -726,6 +752,12 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) + .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, + Type.LIST, + Collections.emptyList(), + atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE), + Importance.MEDIUM, + RACK_AWARE_ASSIGNMENT_TAGS_DOC) .define(REPLICATION_FACTOR_CONFIG, Type.INT, -1, @@ -1040,6 +1072,16 @@ public class StreamsConfig extends AbstractConfig { return RESTORE_CONSUMER_PREFIX + consumerProp; } + /** + * Prefix a client tag key with {@link #CLIENT_TAG_PREFIX}. + * + * @param clientTagKey client tag key + * @return {@link #CLIENT_TAG_PREFIX} + {@code clientTagKey} + */ + public static String clientTagPrefix(final String clientTagKey) { + return CLIENT_TAG_PREFIX + clientTagKey; + } + /** * Prefix a property with {@link #GLOBAL_CONSUMER_PREFIX}. This is used to isolate {@link ConsumerConfig global consumer configs} * from other client configs. @@ -1159,9 +1201,43 @@ public class StreamsConfig extends AbstractConfig { configUpdates.put(COMMIT_INTERVAL_MS_CONFIG, EOS_DEFAULT_COMMIT_INTERVAL_MS); } + validateRackAwarenessConfiguration(); + return configUpdates; } + private void validateRackAwarenessConfiguration() { + final List rackAwareAssignmentTags = getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); + final Map clientTags = getClientTags(); + + if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) { + throw new ConfigException("At most " + MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " + + "can be specified using " + CLIENT_TAG_PREFIX + " prefix."); + } + + for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) { + if (!clientTags.containsKey(rackAwareAssignmentTag)) { + throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, + rackAwareAssignmentTags, + "Contains invalid value [" + rackAwareAssignmentTag + "] " + + "which doesn't have corresponding tag set via [" + CLIENT_TAG_PREFIX + "] prefix."); + } + } + + clientTags.forEach((tagKey, tagValue) -> { + if (tagKey.length() > MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH) { + throw new ConfigException(CLIENT_TAG_PREFIX, + tagKey, + "Tag key exceeds maximum length of " + MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH + "."); + } + if (tagValue.length() > MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH) { + throw new ConfigException(CLIENT_TAG_PREFIX, + tagValue, + "Tag value exceeds maximum length of " + MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH + "."); + } + }); + } + private Map getCommonConsumerConfigs() { final Map clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames()); @@ -1295,6 +1371,7 @@ public class StreamsConfig extends AbstractConfig { consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG)); consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName()); consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); + consumerProps.put(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, getList(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG)); // disable auto topic creation consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); @@ -1441,6 +1518,21 @@ public class StreamsConfig extends AbstractConfig { return props; } + /** + * Get the configured client tags set with {@link #CLIENT_TAG_PREFIX} prefix. + * + * @return Map of the client tags. + */ + @SuppressWarnings("WeakerAccess") + public Map getClientTags() { + return originalsWithPrefix(CLIENT_TAG_PREFIX).entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, + tagEntry -> Objects.toString(tagEntry.getValue()) + ) + ); + } + private Map getClientPropsWithPrefix(final String prefix, final Set configNames) { final Map props = clientProps(configNames, originals()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7893a06fb79..7401e539c4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -345,6 +345,7 @@ public class StreamThread extends Thread { referenceContainer.adminClient = adminClient; referenceContainer.streamsMetadataState = streamsMetadataState; referenceContainer.time = time; + referenceContainer.clientTags = config.getClientTags(); log.info("Creating restore consumer client"); final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(getRestoreConsumerClientId(threadId)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index d2fa90524f5..2af2fba7187 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -128,7 +128,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf private final ClientState state; private final SortedSet consumers; - ClientMetadata(final String endPoint) { + ClientMetadata(final String endPoint, final Map clientTags) { // get the host info, or null if no endpoint is configured (ie endPoint == null) hostInfo = HostInfo.buildFromEndpoint(endPoint); @@ -136,8 +136,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf // initialize the consumer memberIds consumers = new TreeSet<>(); - // initialize the client state - state = new ClientState(); + // initialize the client state with client tags + state = new ClientState(clientTags); } void addConsumer(final String consumerMemberId, final List ownedPartitions) { @@ -189,6 +189,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf private Supplier taskAssignorSupplier; private byte uniqueField; + private Map clientTags; /** * We need to have the PartitionAssignor and its StreamThread to be mutually accessible since the former needs @@ -223,6 +224,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf taskAssignorSupplier = assignorConfiguration::taskAssignor; assignmentListener = assignorConfiguration.assignmentListener(); uniqueField = 0; + clientTags = referenceContainer.clientTags; } @Override @@ -265,7 +267,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf taskOffsetSums, uniqueField, assignmentErrorCode.get(), - Collections.emptyMap() + clientTags ).encode(); } @@ -338,7 +340,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf futureMetadataVersion = usedVersion; processId = FUTURE_ID; if (!clientMetadataMap.containsKey(FUTURE_ID)) { - clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null)); + clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null, Collections.emptyMap())); } } else { processId = info.processId(); @@ -348,7 +350,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf // create the new client metadata if necessary if (clientMetadata == null) { - clientMetadata = new ClientMetadata(info.userEndPoint()); + clientMetadata = new ClientMetadata(info.userEndPoint(), info.clientTags()); clientMetadataMap.put(info.processId(), clientMetadata); } @@ -1462,6 +1464,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf return uniqueField; } + protected Map clientTags() { + return clientTags; + } + protected void handleRebalanceStart(final Set topics) { taskManager.handleRebalanceStart(topics); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index 65cc7ae1930..5e317efdba9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.slf4j.Logger; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -250,7 +249,7 @@ public final class AssignorConfiguration { maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG); numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); - rackAwareAssignmentTags = Collections.emptyList(); + rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); } AssignmentConfigs(final Long acceptableRecoveryLag, @@ -262,7 +261,7 @@ public final class AssignorConfiguration { this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas); this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas); this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs); - this.rackAwareAssignmentTags = rackAwareAssignmentTags; + this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags); } private static T validated(final String configKey, final T value) { @@ -280,6 +279,7 @@ public final class AssignorConfiguration { "\n maxWarmupReplicas=" + maxWarmupReplicas + "\n numStandbyReplicas=" + numStandbyReplicas + "\n probingRebalanceIntervalMs=" + probingRebalanceIntervalMs + + "\n rackAwareAssignmentTags=" + rackAwareAssignmentTags + "\n}"; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 5ee0e93e6aa..b8ba4ce27e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -29,17 +29,16 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.SortedSet; -import java.util.stream.Collectors; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; +import java.util.stream.Collectors; import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableSet; import static java.util.Comparator.comparing; import static java.util.Comparator.comparingLong; - import static org.apache.kafka.common.utils.Utils.union; import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; @@ -65,6 +64,10 @@ public class ClientState { this(0); } + public ClientState(final Map clientTags) { + this(0, clientTags); + } + ClientState(final int capacity) { this(capacity, Collections.emptyMap()); } @@ -422,6 +425,7 @@ public class ClientState { ") prevStandbyTasks: (" + previousStandbyTasks.taskIds() + ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() + ") taskLagTotals: (" + taskLagTotals.entrySet() + + ") clientTags: (" + clientTags.entrySet() + ") capacity: " + capacity + " assigned: " + assignedTaskCount() + "]"; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java index c7399d7ed8a..cabfa545b10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java @@ -60,7 +60,7 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( numStandbyReplicas, - allTaskIds + statefulTaskIds ); final Map> tagKeyToValues = new HashMap<>(); @@ -79,6 +79,7 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { if (clientState.activeTasks().contains(statefulTaskId)) { assignStandbyTasksToClientsWithDifferentTags( + numStandbyReplicas, standbyTaskClientsByTaskLoad, statefulTaskId, clientId, @@ -94,17 +95,10 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { } if (!tasksToRemainingStandbys.isEmpty()) { - log.debug("Rack aware standby task assignment was not able to assign all standby tasks. " + - "tasksToRemainingStandbys=[{}], pendingStandbyTasksToClientId=[{}]. " + - "Will distribute the remaining standby tasks to least loaded clients.", - tasksToRemainingStandbys, pendingStandbyTasksToClientId); - assignPendingStandbyTasksToLeastLoadedClients(clients, numStandbyReplicas, - rackAwareAssignmentTags, standbyTaskClientsByTaskLoad, - tasksToRemainingStandbys, - pendingStandbyTasksToClientId); + tasksToRemainingStandbys); } // returning false, because standby task assignment will never require a follow-up probing rebalance. @@ -113,34 +107,22 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { private static void assignPendingStandbyTasksToLeastLoadedClients(final Map clients, final int numStandbyReplicas, - final Set rackAwareAssignmentTags, final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, - final Map pendingStandbyTaskToNumberRemainingStandbys, - final Map pendingStandbyTaskToClientId) { + final Map pendingStandbyTaskToNumberRemainingStandbys) { // We need to re offer all the clients to find the least loaded ones standbyTaskClientsByTaskLoad.offerAll(clients.keySet()); for (final Entry pendingStandbyTaskAssignmentEntry : pendingStandbyTaskToNumberRemainingStandbys.entrySet()) { final TaskId activeTaskId = pendingStandbyTaskAssignmentEntry.getKey(); - final UUID clientId = pendingStandbyTaskToClientId.get(activeTaskId); - final int numberOfRemainingStandbys = pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks( + pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks( + numStandbyReplicas, clients, pendingStandbyTaskToNumberRemainingStandbys, standbyTaskClientsByTaskLoad, - activeTaskId + activeTaskId, + log ); - - if (numberOfRemainingStandbys > 0) { - log.warn("Unable to assign {} of {} standby tasks for task [{}] with client tags [{}]. " + - "There is not enough available capacity. You should " + - "increase the number of application instances " + - "on different client tag dimensions " + - "to maintain the requested number of standby replicas. " + - "Rack awareness is configured with [{}] tags.", - numberOfRemainingStandbys, numStandbyReplicas, activeTaskId, - clients.get(clientId).clientTags(), rackAwareAssignmentTags); - } } } @@ -174,7 +156,8 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { } // Visible for testing - static void assignStandbyTasksToClientsWithDifferentTags(final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, + static void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStandbyClients, + final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, final TaskId activeTaskId, final UUID activeTaskClient, final Set rackAwareAssignmentTags, @@ -211,17 +194,32 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { break; } - clientStates.get(clientOnUnusedTagDimensions).assignStandby(activeTaskId); - + final ClientState clientStateOnUsedTagDimensions = clientStates.get(clientOnUnusedTagDimensions); countOfUsedClients++; numRemainingStandbys--; + log.debug("Assigning {} out of {} standby tasks for an active task [{}] with client tags {}. " + + "Standby task client tags are {}.", + numberOfStandbyClients - numRemainingStandbys, numberOfStandbyClients, activeTaskId, + clientStates.get(activeTaskClient).clientTags(), clientStateOnUsedTagDimensions.clientTags()); + + clientStateOnUsedTagDimensions.assignStandby(activeTaskId); lastUsedClient = clientOnUnusedTagDimensions; } while (numRemainingStandbys > 0); if (numRemainingStandbys > 0) { pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient); tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys); + log.warn("Rack aware standby task assignment was not able to assign {} of {} standby tasks for the " + + "active task [{}] with the rack aware assignment tags {}. " + + "This may happen when there aren't enough application instances on different tag " + + "dimensions compared to an active and corresponding standby task. " + + "Consider launching application instances on different tag dimensions than [{}]. " + + "Standby task assignment will fall back to assigning standby tasks to the least loaded clients.", + numRemainingStandbys, numberOfStandbyClients, + activeTaskId, rackAwareAssignmentTags, + clientStates.get(activeTaskClient).clientTags()); + } else { tasksToRemainingStandbys.remove(activeTaskId); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStandbyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStandbyTaskAssignor.java index db6cb4e26ce..680a056a826 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStandbyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStandbyTaskAssignor.java @@ -50,18 +50,12 @@ class DefaultStandbyTaskAssignor implements StandbyTaskAssignor { standbyTaskClientsByTaskLoad.offerAll(clients.keySet()); for (final TaskId task : statefulTaskIds) { - final int numRemainingStandbys = pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(clients, - tasksToRemainingStandbys, - standbyTaskClientsByTaskLoad, - task); - - if (numRemainingStandbys > 0) { - log.warn("Unable to assign {} of {} standby tasks for task [{}]. " + - "There is not enough available capacity. You should " + - "increase the number of application instances " + - "to maintain the requested number of standby replicas.", - numRemainingStandbys, numStandbyReplicas, task); - } + pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(numStandbyReplicas, + clients, + tasksToRemainingStandbys, + standbyTaskClientsByTaskLoad, + task, + log); } // returning false, because standby task assignment will never require a follow-up probing rebalance. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java index d0bb50b66ff..7111ae28e5f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java @@ -129,7 +129,7 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor { return; } - final StandbyTaskAssignor standbyTaskAssignor = createStandbyTaskAssignor(configs); + final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(configs); standbyTaskAssignor.assign(clientStates, allTaskIds, statefulTasks, configs); @@ -142,15 +142,6 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor { ); } - // Visible for testing - static StandbyTaskAssignor createStandbyTaskAssignor(final AssignmentConfigs configs) { - if (!configs.rackAwareAssignmentTags.isEmpty()) { - return new ClientTagAwareStandbyTaskAssignor(); - } else { - return new DefaultStandbyTaskAssignor(); - } - } - private static void balanceTasksOverThreads(final SortedMap clientStates, final Function> currentAssignmentAccessor, final BiConsumer taskUnassignor, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java index 9b46eeb7108..19011d865a1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ReferenceContainer.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.internals.StreamsMetadataState; import org.apache.kafka.streams.processor.internals.TaskManager; import java.util.LinkedList; +import java.util.Map; import java.util.Queue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -37,4 +38,5 @@ public class ReferenceContainer { public final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE); public final Queue nonFatalExceptionsToHandle = new LinkedList<>(); public Time time; + public Map clientTags; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignmentUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignmentUtils.java index 7ed6f5dec1b..3f34e5ef8c1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignmentUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignmentUtils.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; import java.util.Map; import java.util.Set; @@ -33,10 +34,12 @@ final class StandbyTaskAssignmentUtils { client -> clients.get(client).assignedTaskLoad()); } - static int pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(final Map clients, - final Map tasksToRemainingStandbys, - final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, - final TaskId activeTaskId) { + static void pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(final int numStandbyReplicas, + final Map clients, + final Map tasksToRemainingStandbys, + final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, + final TaskId activeTaskId, + final Logger log) { int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId); while (numRemainingStandbys > 0) { final UUID client = standbyTaskClientsByTaskLoad.poll(activeTaskId); @@ -49,7 +52,13 @@ final class StandbyTaskAssignmentUtils { tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys); } - return numRemainingStandbys; + if (numRemainingStandbys > 0) { + log.warn("Unable to assign {} of {} standby tasks for task [{}]. " + + "There is not enough available capacity. You should " + + "increase the number of application instances " + + "to maintain the requested number of standby replicas.", + numRemainingStandbys, numStandbyReplicas, activeTaskId); + } } static Map computeTasksToRemainingStandbys(final int numStandbyReplicas, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactory.java new file mode 100644 index 00000000000..30c78f33f38 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +class StandbyTaskAssignorFactory { + private StandbyTaskAssignorFactory() {} + + static StandbyTaskAssignor create(final AssignorConfiguration.AssignmentConfigs configs) { + if (!configs.rackAwareAssignmentTags.isEmpty()) { + return new ClientTagAwareStandbyTaskAssignor(); + } else { + return new DefaultStandbyTaskAssignor(); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 2e1b0d842e1..05b13a52e50 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -38,19 +38,25 @@ import org.junit.Test; import java.io.File; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Properties; +import static java.util.Collections.nCopies; import static org.apache.kafka.common.IsolationLevel.READ_COMMITTED; import static org.apache.kafka.common.IsolationLevel.READ_UNCOMMITTED; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_BETA; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; +import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH; +import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH; import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG; import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG; import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; @@ -1124,6 +1130,105 @@ public class StreamsConfigTest { assertThrows(ConfigException.class, () -> new StreamsConfig(props)); } + @Test + public void shouldDefaultToEmptyListIfRackAwareAssignmentTagsIsNotSet() { + final StreamsConfig config = new StreamsConfig(props); + assertTrue(config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG).isEmpty()); + } + + @Test + public void shouldThrowExceptionWhenClientTagsExceedTheLimit() { + final int limit = StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + 1; + for (int i = 0; i < limit; i++) { + props.put(StreamsConfig.clientTagPrefix("k" + i), "v" + i); + } + final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + assertEquals( + String.format("At most %s client tags can be specified using %s prefix.", + StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE, + StreamsConfig.CLIENT_TAG_PREFIX + ), exception.getMessage() + ); + } + + @Test + public void shouldThrowExceptionWhenRackAwareAssignmentTagsExceedsMaxListSize() { + final int limit = StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + 1; + final List rackAwareAssignmentTags = new ArrayList<>(); + for (int i = 0; i < limit; i++) { + final String clientTagKey = "k" + i; + rackAwareAssignmentTags.add(clientTagKey); + props.put(StreamsConfig.clientTagPrefix(clientTagKey), "v" + i); + } + + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, String.join(",", rackAwareAssignmentTags)); + final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + assertEquals( + String.format("Invalid value %s for configuration %s: exceeds maximum list size of [%s].", + rackAwareAssignmentTags, + StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, + StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE), + exception.getMessage() + ); + } + + @Test + public void shouldSetRackAwareAssignmentTags() { + props.put(StreamsConfig.clientTagPrefix("cluster"), "cluster-1"); + props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a"); + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, "cluster,zone"); + final StreamsConfig config = new StreamsConfig(props); + assertEquals(new HashSet<>(config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG)), + mkSet("cluster", "zone")); + } + + @Test + public void shouldGetEmptyMapIfClientTagsAreNotSet() { + final StreamsConfig config = new StreamsConfig(props); + assertTrue(config.getClientTags().isEmpty()); + } + + @Test + public void shouldGetClientTagsMapWhenSet() { + props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a"); + props.put(StreamsConfig.clientTagPrefix("cluster"), "cluster-1"); + final StreamsConfig config = new StreamsConfig(props); + final Map clientTags = config.getClientTags(); + assertEquals(clientTags.size(), 2); + assertEquals(clientTags.get("zone"), "eu-central-1a"); + assertEquals(clientTags.get("cluster"), "cluster-1"); + } + + @Test + public void shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithUnknownTags() { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, "cluster"); + assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + } + + @Test + public void shouldThrowExceptionWhenClientTagKeyExceedMaxLimit() { + final String key = String.join("", nCopies(MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH + 1, "k")); + props.put(StreamsConfig.clientTagPrefix(key), "eu-central-1a"); + final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + assertEquals( + String.format("Invalid value %s for configuration %s: Tag key exceeds maximum length of %s.", + key, StreamsConfig.CLIENT_TAG_PREFIX, StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH), + exception.getMessage() + ); + } + + @Test + public void shouldThrowExceptionWhenClientTagValueExceedMaxLimit() { + final String value = String.join("", nCopies(MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH + 1, "v")); + props.put(StreamsConfig.clientTagPrefix("x"), value); + final ConfigException exception = assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + assertEquals( + String.format("Invalid value %s for configuration %s: Tag value exceeds maximum length of %s.", + value, StreamsConfig.CLIENT_TAG_PREFIX, StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH), + exception.getMessage() + ); + } + static class MisconfiguredSerde implements Serde { @Override public void configure(final Map configs, final boolean isKey) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java new file mode 100644 index 00000000000..f4afbe8012c --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.ThreadMetadata; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Repartitioned; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category({IntegrationTest.class}) +public class RackAwarenessIntegrationTest { + private static final int NUM_BROKERS = 1; + + private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + + private static final String TAG_VALUE_K8_CLUSTER_1 = "k8s-cluster-1"; + private static final String TAG_VALUE_K8_CLUSTER_2 = "k8s-cluster-2"; + private static final String TAG_VALUE_K8_CLUSTER_3 = "k8s-cluster-3"; + private static final String TAG_VALUE_EU_CENTRAL_1A = "eu-central-1a"; + private static final String TAG_VALUE_EU_CENTRAL_1B = "eu-central-1b"; + private static final String TAG_VALUE_EU_CENTRAL_1C = "eu-central-1c"; + + private static final int DEFAULT_NUMBER_OF_STATEFUL_SUB_TOPOLOGIES = 1; + private static final int DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES = 2; + + @Rule + public TestName testName = new TestName(); + + private static final String INPUT_TOPIC = "input-topic"; + + private static final String TAG_ZONE = "zone"; + private static final String TAG_CLUSTER = "cluster"; + + private List kafkaStreamsInstances; + private Properties baseConfiguration; + private Topology topology; + + @BeforeClass + public static void createTopics() throws Exception { + CLUSTER.start(); + CLUSTER.createTopic(INPUT_TOPIC, 6, 1); + } + + @Before + public void setup() { + kafkaStreamsInstances = new ArrayList<>(); + baseConfiguration = new Properties(); + final String safeTestName = safeUniqueTestName(getClass(), testName); + final String applicationId = "app-" + safeTestName; + baseConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + baseConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + baseConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + baseConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + baseConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + } + + @After + public void cleanup() throws IOException { + for (final KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : kafkaStreamsInstances) { + kafkaStreamsWithConfiguration.kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT)); + IntegrationTestUtils.purgeLocalStreamsState(kafkaStreamsWithConfiguration.configuration); + } + kafkaStreamsInstances.clear(); + } + + @Test + public void shouldDoRebalancingWithMaximumNumberOfClientTags() throws Exception { + initTopology(3, 3); + final int numberOfStandbyReplicas = 1; + + final List clientTagKeys = new ArrayList<>(); + final Map clientTags1 = new HashMap<>(); + final Map clientTags2 = new HashMap<>(); + + for (int i = 0; i < StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE; i++) { + clientTagKeys.add("key-" + i); + } + + for (int i = 0; i < clientTagKeys.size(); i++) { + final String key = clientTagKeys.get(i); + clientTags1.put(key, "value-1-" + i); + clientTags2.put(key, "value-2-" + i); + } + + assertEquals(StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE, clientTagKeys.size()); + Stream.of(clientTags1, clientTags2) + .forEach(clientTags -> assertEquals(String.format("clientsTags with content '%s' " + + "did not match expected size", clientTags), + StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE, + clientTags.size())); + + createAndStart(clientTags1, clientTagKeys, numberOfStandbyReplicas); + createAndStart(clientTags1, clientTagKeys, numberOfStandbyReplicas); + createAndStart(clientTags2, clientTagKeys, numberOfStandbyReplicas); + + waitUntilAllKafkaStreamsClientsAreRunning(); + assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys)); + + stopKafkaStreamsInstanceWithIndex(0); + + waitUntilAllKafkaStreamsClientsAreRunning(); + + assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys)); + } + + @Test + public void shouldDistributeStandbyReplicasWhenAllClientsAreLocatedOnASameClusterTag() throws Exception { + initTopology(); + final int numberOfStandbyReplicas = 1; + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + waitUntilAllKafkaStreamsClientsAreRunning(); + assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE))); + } + + @Test + public void shouldDistributeStandbyReplicasOverMultipleClientTags() throws Exception { + initTopology(); + final int numberOfStandbyReplicas = 2; + + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + waitUntilAllKafkaStreamsClientsAreRunning(); + assertTrue(isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER))); + } + + @Test + public void shouldDistributeStandbyReplicasWhenIdealDistributionCanNotBeAchieved() throws Exception { + initTopology(); + final int numberOfStandbyReplicas = 2; + + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_2), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); + + waitUntilAllKafkaStreamsClientsAreRunning(); + + assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE))); + assertTrue(isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER))); + } + + private void stopKafkaStreamsInstanceWithIndex(final int index) { + kafkaStreamsInstances.get(index).kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT)); + kafkaStreamsInstances.remove(index); + } + + private void waitUntilAllKafkaStreamsClientsAreRunning() throws Exception { + waitUntilAllKafkaStreamsClientsAreRunning(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT)); + } + + private void waitUntilAllKafkaStreamsClientsAreRunning(final Duration timeout) throws Exception { + IntegrationTestUtils.waitForApplicationState(kafkaStreamsInstances.stream().map(it -> it.kafkaStreams).collect(Collectors.toList()), + KafkaStreams.State.RUNNING, + timeout); + } + + private boolean isPartialTaskDistributionReachedForTags(final Collection tagsToCheck) { + final Predicate partialTaskClientTagDistributionTest = taskClientTagDistribution -> { + final Map activeTaskClientTags = taskClientTagDistribution.activeTaskClientTags.clientTags; + return tagsAmongstActiveAndAtLeastOneStandbyTaskIsDifferent(taskClientTagDistribution.standbyTasksClientTags, activeTaskClientTags, tagsToCheck); + }; + + return isTaskDistributionTestSuccessful(partialTaskClientTagDistributionTest); + } + + private boolean isIdealTaskDistributionReachedForTags(final Collection tagsToCheck) { + final Predicate idealTaskClientTagDistributionTest = taskClientTagDistribution -> { + final Map activeTaskClientTags = taskClientTagDistribution.activeTaskClientTags.clientTags; + return tagsAmongstStandbyTasksAreDifferent(taskClientTagDistribution.standbyTasksClientTags, tagsToCheck) + && tagsAmongstActiveAndAllStandbyTasksAreDifferent(taskClientTagDistribution.standbyTasksClientTags, + activeTaskClientTags, + tagsToCheck); + }; + + return isTaskDistributionTestSuccessful(idealTaskClientTagDistributionTest); + } + + private boolean isTaskDistributionTestSuccessful(final Predicate taskClientTagDistributionPredicate) { + final List tasksClientTagDistributions = getTasksClientTagDistributions(); + + if (tasksClientTagDistributions.isEmpty()) { + return false; + } + + return tasksClientTagDistributions.stream().allMatch(taskClientTagDistributionPredicate); + } + + private static boolean tagsAmongstActiveAndAllStandbyTasksAreDifferent(final Collection standbyTasks, + final Map activeTaskClientTags, + final Collection tagsToCheck) { + return standbyTasks.stream().allMatch(standbyTask -> tagsToCheck.stream().noneMatch(tag -> activeTaskClientTags.get(tag).equals(standbyTask.clientTags.get(tag)))); + } + + private static boolean tagsAmongstActiveAndAtLeastOneStandbyTaskIsDifferent(final Collection standbyTasks, + final Map activeTaskClientTags, + final Collection tagsToCheck) { + return standbyTasks.stream().anyMatch(standbyTask -> tagsToCheck.stream().noneMatch(tag -> activeTaskClientTags.get(tag).equals(standbyTask.clientTags.get(tag)))); + } + + private static boolean tagsAmongstStandbyTasksAreDifferent(final Collection standbyTasks, final Collection tagsToCheck) { + final Map statistics = new HashMap<>(); + + for (final TaskClientTags standbyTask : standbyTasks) { + for (final String tag : tagsToCheck) { + final String tagValue = standbyTask.clientTags.get(tag); + final Integer tagValueOccurrence = statistics.getOrDefault(tagValue, 0); + statistics.put(tagValue, tagValueOccurrence + 1); + } + } + + return statistics.values().stream().noneMatch(occurrence -> occurrence > 1); + } + + private void initTopology() { + initTopology(DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES, DEFAULT_NUMBER_OF_STATEFUL_SUB_TOPOLOGIES); + } + + private void initTopology(final int numberOfPartitionsOfSubTopologies, final int numberOfStatefulSubTopologies) { + final StreamsBuilder builder = new StreamsBuilder(); + final String stateStoreName = "myTransformState"; + + final StoreBuilder> keyValueStoreBuilder = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(stateStoreName), + Serdes.Integer(), + Serdes.Integer() + ); + + builder.addStateStore(keyValueStoreBuilder); + + final KStream stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer())); + + // Stateless sub-topology + stream.repartition(Repartitioned.numberOfPartitions(numberOfPartitionsOfSubTopologies)).filter((k, v) -> true); + + // Stateful sub-topologies + for (int i = 0; i < numberOfStatefulSubTopologies; i++) { + stream.repartition(Repartitioned.numberOfPartitions(numberOfPartitionsOfSubTopologies)) + .groupByKey() + .reduce(Integer::sum); + } + + topology = builder.build(); + } + + private List getTasksClientTagDistributions() { + final List taskClientTags = new ArrayList<>(); + + for (final KafkaStreamsWithConfiguration kafkaStreamsInstance : kafkaStreamsInstances) { + final StreamsConfig config = new StreamsConfig(kafkaStreamsInstance.configuration); + for (final ThreadMetadata localThreadsMetadata : kafkaStreamsInstance.kafkaStreams.metadataForLocalThreads()) { + localThreadsMetadata.activeTasks().forEach(activeTask -> { + final TaskId activeTaskId = activeTask.taskId(); + final Map clientTags = config.getClientTags(); + + final List standbyTasks = findStandbysForActiveTask(activeTaskId); + + if (!standbyTasks.isEmpty()) { + final TaskClientTags activeTaskView = new TaskClientTags(activeTaskId, clientTags); + taskClientTags.add(new TaskClientTagDistribution(activeTaskView, standbyTasks)); + } + }); + + } + } + + return taskClientTags; + } + + private List findStandbysForActiveTask(final TaskId taskId) { + final List standbyTasks = new ArrayList<>(); + + for (final KafkaStreamsWithConfiguration kafkaStreamsInstance : kafkaStreamsInstances) { + for (final ThreadMetadata localThreadsMetadata : kafkaStreamsInstance.kafkaStreams.metadataForLocalThreads()) { + localThreadsMetadata.standbyTasks().forEach(standbyTask -> { + final TaskId standbyTaskId = standbyTask.taskId(); + if (taskId.equals(standbyTaskId)) { + final StreamsConfig config = new StreamsConfig(kafkaStreamsInstance.configuration); + standbyTasks.add(new TaskClientTags(standbyTaskId, config.getClientTags())); + } + }); + } + } + + return standbyTasks; + } + + private static Map buildClientTags(final String zone, final String cluster) { + final Map clientTags = new HashMap<>(); + + clientTags.put(TAG_ZONE, zone); + clientTags.put(TAG_CLUSTER, cluster); + + return clientTags; + } + + private void createAndStart(final Map clientTags, + final Collection rackAwareAssignmentTags, + final int numberOfStandbyReplicas) { + final Properties streamsConfiguration = createStreamsConfiguration(clientTags, rackAwareAssignmentTags, numberOfStandbyReplicas); + final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration); + + kafkaStreamsInstances.add(new KafkaStreamsWithConfiguration(streamsConfiguration, kafkaStreams)); + + kafkaStreams.start(); + } + + private Properties createStreamsConfiguration(final Map clientTags, + final Collection rackAwareAssignmentTags, + final int numStandbyReplicas) { + final Properties streamsConfiguration = new Properties(); + streamsConfiguration.putAll(baseConfiguration); + streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas); + streamsConfiguration.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, String.join(",", rackAwareAssignmentTags)); + clientTags.forEach((key, value) -> streamsConfiguration.put(StreamsConfig.clientTagPrefix(key), value)); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(String.join("-", clientTags.values())).getPath()); + return streamsConfiguration; + } + + private static final class KafkaStreamsWithConfiguration { + private final Properties configuration; + private final KafkaStreams kafkaStreams; + + KafkaStreamsWithConfiguration(final Properties configuration, final KafkaStreams kafkaStreams) { + this.configuration = configuration; + this.kafkaStreams = kafkaStreams; + } + } + + private static final class TaskClientTagDistribution { + private final TaskClientTags activeTaskClientTags; + private final List standbyTasksClientTags; + + TaskClientTagDistribution(final TaskClientTags activeTaskClientTags, final List standbyTasksClientTags) { + this.activeTaskClientTags = activeTaskClientTags; + this.standbyTasksClientTags = standbyTasksClientTags; + } + + @Override + public String toString() { + return "TaskDistribution{" + + "activeTaskClientTagsView=" + activeTaskClientTags + + ", standbyTasks=" + standbyTasksClientTags + + '}'; + } + } + + private static final class TaskClientTags { + private final TaskId taskId; + private final Map clientTags; + + TaskClientTags(final TaskId taskId, final Map clientTags) { + this.taskId = taskId; + this.clientTags = clientTags; + } + + @Override + public String toString() { + return "TaskClientTags{" + + "taskId=" + taskId + + ", clientTags=" + clientTags + + '}'; + } + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index d11f3e056cf..e2c08ed4b62 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import java.time.Duration; -import java.util.Properties; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListOffsetsResult; @@ -79,6 +77,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -86,6 +85,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.SortedSet; import java.util.UUID; @@ -196,6 +196,7 @@ public class StreamsPartitionAssignorTest { private StreamsMetadataState streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class); private final Map subscriptions = new HashMap<>(); private final Class taskAssignor; + private Map clientTags; private final ReferenceContainer referenceContainer = new ReferenceContainer(); private final MockTime time = new MockTime(); @@ -210,6 +211,7 @@ public class StreamsPartitionAssignorTest { referenceContainer.taskManager = taskManager; referenceContainer.streamsMetadataState = streamsMetadataState; referenceContainer.time = time; + referenceContainer.clientTags = clientTags != null ? clientTags : EMPTY_CLIENT_TAGS; configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer); configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName()); return configurationMap; @@ -2190,6 +2192,21 @@ public class StreamsPartitionAssignorTest { equalTo(AssignorError.ASSIGNMENT_ERROR.code())); } + @Test + public void testClientTags() { + clientTags = mkMap(mkEntry("cluster", "cluster1"), mkEntry("zone", "az1")); + createDefaultMockTaskManager(); + configureDefaultPartitionAssignor(); + final Set topics = mkSet("input"); + final Subscription subscription = new Subscription(new ArrayList<>(topics), + partitionAssignor.subscriptionUserData(topics)); + final SubscriptionInfo info = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS, uniqueField, clientTags); + + assertEquals(singletonList("input"), subscription.topics()); + assertEquals(info, SubscriptionInfo.decode(subscription.userData())); + assertEquals(clientTags, partitionAssignor.clientTags()); + } + private static class CorruptedInternalTopologyBuilder extends InternalTopologyBuilder { private Map corruptedTopicGroups; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index cf8a6b297ab..42a32c04b95 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -166,6 +166,15 @@ public final class AssignmentTestUtils { LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, EMPTY_CLIENT_TAGS); } + public static SubscriptionInfo getInfo(final UUID processId, + final Set prevTasks, + final Set standbyTasks, + final byte uniqueField, + final Map clientTags) { + return new SubscriptionInfo( + LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION, processId, null, getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, clientTags); + } + // Stub offset sums for when we only care about the prev/standby task sets, not the actual offsets private static Map getTaskOffsetSums(final Collection activeTasks, final Collection standbyTasks) { final Map taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> Task.LATEST_OFFSET)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java index 8a983dee9be..631430c6a82 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java @@ -81,8 +81,43 @@ public class ClientTagAwareStandbyTaskAssignorTest { standbyTaskAssignor = new ClientTagAwareStandbyTaskAssignor(); } + @Test + public void shouldNotAssignStatelessTasksToAnyClients() { + final Set statefulTasks = mkSet( + TASK_1_0, + TASK_1_1, + TASK_1_2 + ); + + final Map clientStates = mkMap( + mkEntry(UUID_1, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_0, TASK_1_0)), + mkEntry(UUID_2, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_1)))), + mkEntry(UUID_3, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_1)))), + + mkEntry(UUID_4, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_2)), TASK_0_1, TASK_1_1)), + mkEntry(UUID_5, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_2)))), + mkEntry(UUID_6, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_2)))), + + mkEntry(UUID_7, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_3)), TASK_0_2, TASK_1_2)), + mkEntry(UUID_8, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_2), mkEntry(CLUSTER_TAG, CLUSTER_3)))), + mkEntry(UUID_9, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_3), mkEntry(CLUSTER_TAG, CLUSTER_3)))) + ); + + final Set allActiveTasks = findAllActiveTasks(clientStates); + + final AssignmentConfigs assignmentConfigs = newAssignmentConfigs(2, ZONE_TAG, CLUSTER_TAG); + + standbyTaskAssignor.assign(clientStates, allActiveTasks, statefulTasks, assignmentConfigs); + + final Set statelessTasks = allActiveTasks.stream().filter(taskId -> !statefulTasks.contains(taskId)).collect(Collectors.toSet()); + assertTrue( + clientStates.values().stream().allMatch(clientState -> statelessTasks.stream().noneMatch(clientState::hasStandbyTask)) + ); + } + @Test public void shouldRemoveClientToRemainingStandbysAndNotPopulatePendingStandbyTasksToClientIdWhenAllStandbyTasksWereAssigned() { + final int numStandbyReplicas = 2; final Set rackAwareAssignmentTags = mkSet(ZONE_TAG, CLUSTER_TAG); final Map clientStates = mkMap( mkEntry(UUID_1, createClientStateWithCapacity(2, mkMap(mkEntry(ZONE_TAG, ZONE_1), mkEntry(CLUSTER_TAG, CLUSTER_1)), TASK_0_0)), @@ -102,10 +137,11 @@ public class ClientTagAwareStandbyTaskAssignorTest { fillClientsTagStatistics(clientStates, tagEntryToClients, tagKeyToValues); final Map pendingStandbyTasksToClientId = new HashMap<>(); - final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(2, allActiveTasks); + final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, allActiveTasks); for (final TaskId activeTaskId : allActiveTasks) { assignStandbyTasksToClientsWithDifferentTags( + numStandbyReplicas, constrainedPrioritySet, activeTaskId, taskToClientId.get(activeTaskId), @@ -132,6 +168,7 @@ public class ClientTagAwareStandbyTaskAssignorTest { ); final ConstrainedPrioritySet constrainedPrioritySet = createLeastLoadedPrioritySetConstrainedByAssignedTask(clientStates); + final int numStandbyReplicas = 3; final Set allActiveTasks = findAllActiveTasks(clientStates); final Map taskToClientId = mkMap(mkEntry(TASK_0_0, UUID_1), mkEntry(TASK_0_1, UUID_2), @@ -143,10 +180,11 @@ public class ClientTagAwareStandbyTaskAssignorTest { fillClientsTagStatistics(clientStates, tagEntryToClients, tagKeyToValues); final Map pendingStandbyTasksToClientId = new HashMap<>(); - final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(3, allActiveTasks); + final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, allActiveTasks); for (final TaskId activeTaskId : allActiveTasks) { assignStandbyTasksToClientsWithDifferentTags( + numStandbyReplicas, constrainedPrioritySet, activeTaskId, taskToClientId.get(activeTaskId), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java index bf78db6457f..36ae42fded3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java @@ -20,10 +20,8 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.junit.Test; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -31,7 +29,6 @@ import java.util.stream.Collectors; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; -import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -68,7 +65,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class HighAvailabilityTaskAssignorTest { @@ -814,27 +810,6 @@ public class HighAvailabilityTaskAssignorTest { assertThat(probingRebalanceNeeded, is(false)); } - @Test - public void shouldReturnClientTagAwareStandbyTaskAssignorWhenRackAwareAssignmentTagsIsSet() { - final StandbyTaskAssignor standbyTaskAssignor = HighAvailabilityTaskAssignor.createStandbyTaskAssignor(newAssignmentConfigs(1, singletonList("az"))); - assertTrue(standbyTaskAssignor instanceof ClientTagAwareStandbyTaskAssignor); - } - - @Test - public void shouldReturnDefaultStandbyTaskAssignorWhenRackAwareAssignmentTagsIsEmpty() { - final StandbyTaskAssignor standbyTaskAssignor = HighAvailabilityTaskAssignor.createStandbyTaskAssignor(newAssignmentConfigs(1, Collections.emptyList())); - assertTrue(standbyTaskAssignor instanceof DefaultStandbyTaskAssignor); - } - - private static AssignorConfiguration.AssignmentConfigs newAssignmentConfigs(final int numStandbyReplicas, - final List rackAwareAssignmentTags) { - return new AssignorConfiguration.AssignmentConfigs(0L, - 1, - numStandbyReplicas, - 60000L, - rackAwareAssignmentTags); - } - private static void assertHasNoActiveTasks(final ClientState... clients) { for (final ClientState client : clients) { assertThat(client.activeTasks(), is(empty())); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignmentUtilsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignmentUtilsTest.java index 1abf1b92635..b13f04b2bd8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignmentUtilsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignmentUtilsTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.streams.processor.TaskId; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; import java.util.Map; import java.util.Set; @@ -36,8 +37,16 @@ import static org.apache.kafka.streams.processor.internals.assignment.StandbyTas import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; public class StandbyTaskAssignmentUtilsTest { + private static final Set ACTIVE_TASKS = mkSet(TASK_0_0, TASK_0_1, TASK_0_2); private Map clients; @@ -45,6 +54,7 @@ public class StandbyTaskAssignmentUtilsTest { @Before public void setup() { + clients = getClientStatesMap(ACTIVE_TASKS.stream().map(StandbyTaskAssignmentUtilsTest::mkState).toArray(ClientState[]::new)); clientsByTaskLoad = new ConstrainedPrioritySet( (client, task) -> !clients.get(client).hasAssignedTask(task), @@ -55,38 +65,38 @@ public class StandbyTaskAssignmentUtilsTest { @Test public void shouldReturnNumberOfStandbyTasksThatWereNotAssigned() { - final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(3, ACTIVE_TASKS); + final Logger logMock = mock(Logger.class); + final int numStandbyReplicas = 3; + final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, ACTIVE_TASKS); - assertTrue(tasksToRemainingStandbys.keySet() - .stream() - .map(taskId -> pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks( - clients, - tasksToRemainingStandbys, - clientsByTaskLoad, - taskId - )) - .allMatch(numRemainingStandbys -> numRemainingStandbys == 1)); + tasksToRemainingStandbys.keySet().forEach(taskId -> pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(numStandbyReplicas, + clients, + tasksToRemainingStandbys, + clientsByTaskLoad, + taskId, + logMock)); assertTrue(ACTIVE_TASKS.stream().allMatch(activeTask -> tasksToRemainingStandbys.get(activeTask) == 1)); assertTrue(areStandbyTasksPresentForAllActiveTasks(2)); + verify(logMock, times(ACTIVE_TASKS.size())).warn(anyString(), anyInt(), anyInt(), any()); } @Test public void shouldReturnZeroWhenAllStandbyTasksWereSuccessfullyAssigned() { - final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(1, ACTIVE_TASKS); + final Logger logMock = mock(Logger.class); + final int numStandbyReplicas = 1; + final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, ACTIVE_TASKS); - assertTrue(tasksToRemainingStandbys.keySet() - .stream() - .map(taskId -> pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks( - clients, - tasksToRemainingStandbys, - clientsByTaskLoad, - taskId - )) - .allMatch(numRemainingStandbys -> numRemainingStandbys == 0)); + tasksToRemainingStandbys.keySet().forEach(taskId -> pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(numStandbyReplicas, + clients, + tasksToRemainingStandbys, + clientsByTaskLoad, + taskId, + logMock)); assertTrue(ACTIVE_TASKS.stream().allMatch(activeTask -> tasksToRemainingStandbys.get(activeTask) == 0)); assertTrue(areStandbyTasksPresentForAllActiveTasks(1)); + verifyNoInteractions(logMock); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java new file mode 100644 index 00000000000..fdd7fa1d473 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.junit.Assert.assertTrue; + +public class StandbyTaskAssignorFactoryTest { + private static final long ACCEPTABLE_RECOVERY_LAG = 0L; + private static final int MAX_WARMUP_REPLICAS = 1; + private static final int NUMBER_OF_STANDBY_REPLICAS = 1; + private static final long PROBING_REBALANCE_INTERVAL_MS = 60000L; + + @Test + public void shouldReturnClientTagAwareStandbyTaskAssignorWhenRackAwareAssignmentTagsIsSet() { + final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(newAssignmentConfigs(singletonList("az"))); + assertTrue(standbyTaskAssignor instanceof ClientTagAwareStandbyTaskAssignor); + } + + @Test + public void shouldReturnDefaultStandbyTaskAssignorWhenRackAwareAssignmentTagsIsEmpty() { + final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(newAssignmentConfigs(Collections.emptyList())); + assertTrue(standbyTaskAssignor instanceof DefaultStandbyTaskAssignor); + } + + private static AssignorConfiguration.AssignmentConfigs newAssignmentConfigs(final List rackAwareAssignmentTags) { + return new AssignorConfiguration.AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG, + MAX_WARMUP_REPLICAS, + NUMBER_OF_STANDBY_REPLICAS, + PROBING_REBALANCE_INTERVAL_MS, + rackAwareAssignmentTags); + } +} \ No newline at end of file