KAFKA-15045: (KIP-924 pt. 19) Update to new AssignmentConfigs (#16219)

This PR updates all of the streams task assignment code to use the new AssignmentConfigs public class.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
This commit is contained in:
Antoine Pourchet 2024-06-06 15:20:48 -06:00 committed by GitHub
parent 8a2bc3a221
commit ee834d9214
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 118 additions and 160 deletions

View File

@ -82,7 +82,7 @@ public class AssignmentConfigs {
final long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags,
final OptionalInt rackAwareTrafficCost,
final OptionalInt rackAwareNonOverlapCost,
final OptionalInt rackAwareNonOverlapCost,
final String rackAwareAssignmentStrategy) {
this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag);
this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
@ -90,16 +90,36 @@ public class AssignmentConfigs {
this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags);
this.rackAwareTrafficCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
rackAwareTrafficCost
defaultRackAwareTrafficCost(rackAwareTrafficCost)
);
this.rackAwareNonOverlapCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
rackAwareNonOverlapCost
defaultRackAwareNonOverlapCost(rackAwareNonOverlapCost)
);
this.rackAwareAssignmentStrategy = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
rackAwareAssignmentStrategy
);
}
public AssignmentConfigs(final long acceptableRecoveryLag,
final int maxWarmupReplicas,
final int numStandbyReplicas,
final long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags,
final int rackAwareTrafficCost,
final int rackAwareNonOverlapCost,
final String rackAwareAssignmentStrategy) {
this(acceptableRecoveryLag, maxWarmupReplicas, numStandbyReplicas, probingRebalanceIntervalMs, rackAwareAssignmentTags,
OptionalInt.of(rackAwareTrafficCost), OptionalInt.of(rackAwareNonOverlapCost), rackAwareAssignmentStrategy);
}
public AssignmentConfigs(final Long acceptableRecoveryLag,
final Integer maxWarmupReplicas,
final Integer numStandbyReplicas,
final Long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags) {
this(acceptableRecoveryLag, maxWarmupReplicas, numStandbyReplicas, probingRebalanceIntervalMs, rackAwareAssignmentTags,
OptionalInt.empty(), OptionalInt.empty(), StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE);
}
/**
* The configured acceptable recovery lag according to
@ -186,4 +206,18 @@ public class AssignmentConfigs {
"\n rackAwareAssignmentStrategy=" + rackAwareAssignmentStrategy +
"\n}";
}
private static OptionalInt defaultRackAwareTrafficCost(final OptionalInt rackAwareTrafficCost) {
if (rackAwareTrafficCost == null) {
return OptionalInt.empty();
}
return rackAwareTrafficCost;
}
private static OptionalInt defaultRackAwareNonOverlapCost(final OptionalInt rackAwareNonOverlapCost) {
if (rackAwareNonOverlapCost == null) {
return OptionalInt.empty();
}
return rackAwareNonOverlapCost;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ApplicationState;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError;
import org.apache.kafka.streams.processor.assignment.TaskInfo;
import org.apache.kafka.streams.processor.assignment.ProcessId;
@ -53,7 +54,6 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.Topi
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
@ -206,7 +206,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private String userEndPoint;
private AssignmentConfigs assignmentConfigs;
private org.apache.kafka.streams.processor.assignment.AssignmentConfigs publicAssignmentConfigs;
// for the main consumer, we need to use a supplier to break a cyclic setup dependency
private Supplier<Consumer<byte[], byte[]>> mainConsumerSupplier;
@ -257,7 +256,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
nonFatalExceptionsToHandle = referenceContainer.nonFatalExceptionsToHandle;
time = Objects.requireNonNull(referenceContainer.time, "Time was not specified");
assignmentConfigs = assignorConfiguration.assignmentConfigs();
publicAssignmentConfigs = assignorConfiguration.publicAssignmentConfigs();
partitionGrouper = new PartitionGrouper();
userEndPoint = assignorConfiguration.userEndPoint();
internalTopicManager = assignorConfiguration.internalTopicManager();
@ -582,7 +580,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
));
return new DefaultApplicationState(
publicAssignmentConfigs,
assignmentConfigs,
logicalTasks,
clientMetadataMap
);
@ -1760,19 +1758,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
}
long acceptableRecoveryLag() {
return assignmentConfigs.acceptableRecoveryLag;
return assignmentConfigs.acceptableRecoveryLag();
}
int maxWarmupReplicas() {
return assignmentConfigs.maxWarmupReplicas;
return assignmentConfigs.maxWarmupReplicas();
}
int numStandbyReplicas() {
return assignmentConfigs.numStandbyReplicas;
return assignmentConfigs.numStandbyReplicas();
}
long probingRebalanceIntervalMs() {
return assignmentConfigs.probingRebalanceIntervalMs;
return assignmentConfigs.probingRebalanceIntervalMs();
}
}

View File

@ -20,18 +20,17 @@ import java.util.Optional;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.slf4j.Logger;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.getHost;
@ -240,11 +239,7 @@ public final class AssignorConfiguration {
}
public AssignmentConfigs assignmentConfigs() {
return new AssignmentConfigs(streamsConfig);
}
public org.apache.kafka.streams.processor.assignment.AssignmentConfigs publicAssignmentConfigs() {
return org.apache.kafka.streams.processor.assignment.AssignmentConfigs.of(streamsConfig);
return AssignmentConfigs.of(streamsConfig);
}
public TaskAssignor taskAssignor() {
@ -297,72 +292,4 @@ public final class AssignorConfiguration {
public interface AssignmentListener {
void onAssignmentComplete(final boolean stable);
}
public static class AssignmentConfigs {
public final long acceptableRecoveryLag;
public final int maxWarmupReplicas;
public final int numStandbyReplicas;
public final long probingRebalanceIntervalMs;
public final List<String> rackAwareAssignmentTags;
public final Integer rackAwareAssignmentTrafficCost;
public final Integer rackAwareAssignmentNonOverlapCost;
public final String rackAwareAssignmentStrategy;
private AssignmentConfigs(final StreamsConfig configs) {
acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
rackAwareAssignmentTrafficCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
rackAwareAssignmentNonOverlapCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
rackAwareAssignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
}
AssignmentConfigs(final Long acceptableRecoveryLag,
final Integer maxWarmupReplicas,
final Integer numStandbyReplicas,
final Long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags) {
this(acceptableRecoveryLag, maxWarmupReplicas, numStandbyReplicas, probingRebalanceIntervalMs, rackAwareAssignmentTags,
null, null, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE);
}
AssignmentConfigs(final Long acceptableRecoveryLag,
final Integer maxWarmupReplicas,
final Integer numStandbyReplicas,
final Long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags,
final Integer rackAwareAssignmentTrafficCost,
final Integer rackAwareAssignmentNonOverlapCost,
final String rackAwareAssignmentStrategy) {
this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag);
this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags);
this.rackAwareAssignmentTrafficCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, rackAwareAssignmentTrafficCost);
this.rackAwareAssignmentNonOverlapCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, rackAwareAssignmentNonOverlapCost);
this.rackAwareAssignmentStrategy = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareAssignmentStrategy);
}
private static <T> T validated(final String configKey, final T value) {
final ConfigDef.Validator validator = StreamsConfig.configDef().configKeys().get(configKey).validator;
if (validator != null) {
validator.ensureValid(configKey, value);
}
return value;
}
@Override
public String toString() {
return "AssignmentConfigs{" +
"\n acceptableRecoveryLag=" + acceptableRecoveryLag +
"\n maxWarmupReplicas=" + maxWarmupReplicas +
"\n numStandbyReplicas=" + numStandbyReplicas +
"\n probingRebalanceIntervalMs=" + probingRebalanceIntervalMs +
"\n rackAwareAssignmentTags=" + rackAwareAssignmentTags +
"\n}";
}
}
}

View File

@ -23,7 +23,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -52,7 +52,9 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
private final Function<AssignmentConfigs, List<String>> tagsFunction;
public ClientTagAwareStandbyTaskAssignor() {
this((uuid, clientState) -> clientState.clientTags(), assignmentConfigs -> assignmentConfigs.rackAwareAssignmentTags);
this((uuid, clientState) -> clientState.clientTags(),
AssignmentConfigs::rackAwareAssignmentTags
);
}
public ClientTagAwareStandbyTaskAssignor(final BiFunction<UUID, ClientState, Map<String, String>> clientTagFunction,
@ -63,8 +65,8 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
/**
* The algorithm distributes standby tasks for the {@param statefulTaskIds} over different tag dimensions.
* For each stateful task, the number of standby tasks will be assigned based on configured {@link AssignmentConfigs#numStandbyReplicas}.
* Rack aware standby tasks distribution only takes into account tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags}.
* For each stateful task, the number of standby tasks will be assigned based on configured {@link AssignmentConfigs#numStandbyReplicas()}.
* Rack aware standby tasks distribution only takes into account tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags()}.
* Ideally, all standby tasks for any given stateful task will be located on different tag dimensions to have the best possible distribution.
* However, if the ideal (or partially ideal) distribution is impossible, the algorithm will fall back to the least-loaded clients without taking rack awareness constraints into consideration.
* The least-loaded clients are determined based on the total number of tasks (active and standby tasks) assigned to the client.
@ -73,8 +75,8 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final AssignorConfiguration.AssignmentConfigs configs) {
final int numStandbyReplicas = configs.numStandbyReplicas;
final AssignmentConfigs configs) {
final int numStandbyReplicas = configs.numStandbyReplicas();
final Set<String> rackAwareAssignmentTags = new HashSet<>(tagsFunction.apply(configs));
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,8 +41,8 @@ class DefaultStandbyTaskAssignor implements StandbyTaskAssignor {
public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final AssignorConfiguration.AssignmentConfigs configs) {
final int numStandbyReplicas = configs.numStandbyReplicas;
final AssignmentConfigs configs) {
final int numStandbyReplicas = configs.numStandbyReplicas();
final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas,
statefulTaskIds);

View File

@ -17,11 +17,11 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
/**
* A special task assignor implementation to be used as a fallback in case the

View File

@ -17,8 +17,8 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -67,12 +67,12 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
configs
);
final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas);
final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas());
final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients = tasksToCaughtUpClients(
statefulTasks,
clientStates,
configs.acceptableRecoveryLag
configs.acceptableRecoveryLag()
);
final Map<TaskId, SortedSet<UUID>> tasksToClientByLag = tasksToClientByLag(statefulTasks, clientStates);
@ -134,10 +134,8 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
);
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
final int trafficCost = configs.rackAwareTrafficCost().orElse(DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST);
final int nonOverlapCost = configs.rackAwareNonOverlapCost().orElse(DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST);
rackAwareTaskAssignor.optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost);
}
}
@ -147,7 +145,7 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
final Set<TaskId> statefulTasks,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
if (configs.numStandbyReplicas == 0) {
if (configs.numStandbyReplicas() == 0) {
return;
}
@ -164,10 +162,8 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor {
);
if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
final int trafficCost = configs.rackAwareTrafficCost().orElse(DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST);
final int nonOverlapCost = configs.rackAwareNonOverlapCost().orElse(DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST);
rackAwareTaskAssignor.optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, standbyTaskAssignor::isAllowedTaskMovement);
}
}

View File

@ -25,14 +25,14 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
public class RackAwareGraphConstructorFactory {
static <T> RackAwareGraphConstructor<T> create(final AssignmentConfigs assignmentConfigs,
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup) {
return create(assignmentConfigs.rackAwareAssignmentStrategy, new ArrayList<>(new TreeMap<>(tasksForTopicGroup).values()));
return create(assignmentConfigs.rackAwareAssignmentStrategy(), new ArrayList<>(new TreeMap<>(tasksForTopicGroup).values()));
}
public static <T> RackAwareGraphConstructor<T> create(final String rackAwareAssignmentStrategy,

View File

@ -43,9 +43,9 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -118,14 +118,14 @@ public class RackAwareTaskAssignor {
}
public synchronized boolean canEnableRackAwareAssignor() {
if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy())) {
return false;
}
if (canEnable != null) {
return canEnable;
}
canEnable = validClientRack && validateTopicPartitionRack(false);
if (assignmentConfigs.numStandbyReplicas == 0 || !canEnable) {
if (assignmentConfigs.numStandbyReplicas() == 0 || !canEnable) {
return canEnable;
}
@ -240,7 +240,7 @@ public class RackAwareTaskAssignor {
KeyValue<String, String> previousRackInfo = null;
for (final Map.Entry<String, Optional<String>> rackEntry : entry.getValue().entrySet()) {
if (!rackEntry.getValue().isPresent()) {
if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy())) {
log.error(
String.format("RackId doesn't exist for process %s and consumer %s",
processId, rackEntry.getKey()));
@ -263,7 +263,7 @@ public class RackAwareTaskAssignor {
}
}
if (previousRackInfo == null) {
if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy())) {
log.error(String.format("RackId doesn't exist for process %s", processId));
}
return false;

View File

@ -20,7 +20,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
interface StandbyTaskAssignor extends TaskAssignor {
default boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) {
@ -53,5 +53,5 @@ interface StandbyTaskAssignor extends TaskAssignor {
boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> allTaskIds,
final Set<TaskId> statefulTaskIds,
final AssignorConfiguration.AssignmentConfigs configs);
final AssignmentConfigs configs);
}

View File

@ -22,13 +22,14 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
class StandbyTaskAssignorFactory {
private StandbyTaskAssignorFactory() {}
static StandbyTaskAssignor create(final AssignorConfiguration.AssignmentConfigs configs,
static StandbyTaskAssignor create(final AssignmentConfigs configs,
final RackAwareTaskAssignor rackAwareTaskAssignor) {
if (!configs.rackAwareAssignmentTags.isEmpty()) {
if (!configs.rackAwareAssignmentTags().isEmpty()) {
return new ClientTagAwareStandbyTaskAssignor();
} else if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.validClientRack()) {
// racksForProcess should be populated if rackAwareTaskAssignor isn't null

View File

@ -24,7 +24,7 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -85,17 +85,15 @@ public class StickyTaskAssignor implements TaskAssignor {
assignActive();
optimizeActive();
assignStandby(configs.numStandbyReplicas);
assignStandby(configs.numStandbyReplicas());
optimizeStandby();
return false;
}
private void optimizeStandby() {
if (configs.numStandbyReplicas > 0 && rackAwareTaskAssignor != null && rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
if (configs.numStandbyReplicas() > 0 && rackAwareTaskAssignor != null && rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareTrafficCost().orElse(DEFAULT_STATEFUL_TRAFFIC_COST);
final int nonOverlapCost = configs.rackAwareNonOverlapCost().orElse(DEFAULT_STATEFUL_NON_OVERLAP_COST);
final TreeMap<UUID, ClientState> clientStates = new TreeMap<>(clients);
rackAwareTaskAssignor.optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, (s, d, t, c) -> true);
@ -104,10 +102,8 @@ public class StickyTaskAssignor implements TaskAssignor {
private void optimizeActive() {
if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
final int trafficCost = configs.rackAwareTrafficCost().orElse(DEFAULT_STATEFUL_TRAFFIC_COST);
final int nonOverlapCost = configs.rackAwareNonOverlapCost().orElse(DEFAULT_STATEFUL_NON_OVERLAP_COST);
final SortedSet<TaskId> statefulTasks = new TreeSet<>(statefulTaskIds);
final TreeMap<UUID, ClientState> clientStates = new TreeMap<>(clients);

View File

@ -21,7 +21,7 @@ import org.apache.kafka.streams.processor.TaskId;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
public interface TaskAssignor {
/**

View File

@ -25,9 +25,9 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
@ -147,7 +147,7 @@ public class TaskAssignorIntegrationTest {
final Field assignmentConfigs = StreamsPartitionAssignor.class.getDeclaredField("assignmentConfigs");
assignmentConfigs.setAccessible(true);
final AssignorConfiguration.AssignmentConfigs configs = (AssignorConfiguration.AssignmentConfigs) assignmentConfigs.get(streamsPartitionAssignor);
final AssignmentConfigs configs = (AssignmentConfigs) assignmentConfigs.get(streamsPartitionAssignor);
final Field assignmentListenerField = StreamsPartitionAssignor.class.getDeclaredField("assignmentListener");
assignmentListenerField.setAccessible(true);
@ -159,10 +159,10 @@ public class TaskAssignorIntegrationTest {
(Supplier<TaskAssignor>) taskAssignorSupplierField.get(streamsPartitionAssignor);
final TaskAssignor taskAssignor = taskAssignorSupplier.get();
assertThat(configs.numStandbyReplicas, is(5));
assertThat(configs.acceptableRecoveryLag, is(6L));
assertThat(configs.maxWarmupReplicas, is(7));
assertThat(configs.probingRebalanceIntervalMs, is(480000L));
assertThat(configs.numStandbyReplicas(), is(5));
assertThat(configs.acceptableRecoveryLag(), is(6L));
assertThat(configs.maxWarmupReplicas(), is(7));
assertThat(configs.probingRebalanceIntervalMs(), is(480000L));
assertThat(actualAssignmentListener, sameInstance(configuredAssignmentListener));
assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
}

View File

@ -39,11 +39,11 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.hamcrest.BaseMatcher;

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.junit.Before;
import org.junit.Test;
@ -45,7 +46,7 @@ public class AssignorConfigurationTest {
public void configsShouldRejectZeroWarmups() {
final ConfigException exception = assertThrows(
ConfigException.class,
() -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
() -> new AssignmentConfigs(1L, 0, 1, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(exception.getMessage(), containsString("Invalid value 0 for configuration max.warmup.replicas"));

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.assignment.ClientTagAwareStandbyTaskAssignor.TagEntry;
import org.junit.Before;
import org.junit.Test;

View File

@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.junit.Test;
import java.util.Collections;
@ -55,7 +56,7 @@ public class FallbackPriorTaskAssignorTest {
new HashSet<>(taskIds),
new HashSet<>(taskIds),
null,
new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS)
);
assertThat(probingRebalanceNeeded, is(true));

View File

@ -27,9 +27,9 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
@ -1459,7 +1459,7 @@ public class HighAvailabilityTaskAssignorTest {
final Map<UUID, Map<String, Optional<String>>> processRackMap = getRandomProcessRacks(clientSize, nodeSize);
final InternalTopicManager mockInternalTopicManager = mockInternalTopicManagerForRandomChangelog(nodeSize, tpSize, partitionSize);
AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(
AssignmentConfigs configs = new AssignmentConfigs(
0L,
1,
replicaCount,
@ -1508,7 +1508,7 @@ public class HighAvailabilityTaskAssignorTest {
}
final SortedMap<UUID, ClientState> clientStateMapCopy = copyClientStateMap(clientStateMap);
configs = new AssignorConfiguration.AssignmentConfigs(
configs = new AssignmentConfigs(
0L,
1,
replicaCount,

View File

@ -23,7 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.junit.jupiter.api.Test;
public class RackAwareGraphConstructorFactoryTest {

View File

@ -108,8 +108,8 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.junit.Before;

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals.assignment;
import java.util.Arrays;
import java.util.Collection;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.junit.Before;
import org.junit.Test;
@ -103,11 +104,11 @@ public class StandbyTaskAssignorFactoryTest {
}
}
private static AssignorConfiguration.AssignmentConfigs newAssignmentConfigs(final List<String> rackAwareAssignmentTags) {
return new AssignorConfiguration.AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG,
MAX_WARMUP_REPLICAS,
NUMBER_OF_STANDBY_REPLICAS,
PROBING_REBALANCE_INTERVAL_MS,
rackAwareAssignmentTags);
private static AssignmentConfigs newAssignmentConfigs(final List<String> rackAwareAssignmentTags) {
return new AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG,
MAX_WARMUP_REPLICAS,
NUMBER_OF_STANDBY_REPLICAS,
PROBING_REBALANCE_INTERVAL_MS,
rackAwareAssignmentTags);
}
}

View File

@ -26,9 +26,9 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.junit.Before;
import org.junit.Test;
@ -463,7 +463,7 @@ public class StickyTaskAssignorTest {
final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask = getTaskTopicPartitionMap(topicSize, partitionSize, true);
final Map<UUID, Map<String, Optional<String>>> racksForProcessConsumer = getRandomProcessRacks(clientSize, nodeSize);
final InternalTopicManager internalTopicManager = mockInternalTopicManagerForRandomChangelog(nodeSize, topicSize, partitionSize);
final AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(
final AssignmentConfigs configs = new AssignmentConfigs(
0L,
1,
1,
@ -786,7 +786,7 @@ public class StickyTaskAssignorTest {
final Map<UUID, Map<String, Optional<String>>> racksForProcessConsumer = getRandomProcessRacks(clientSize, nodeSize);
final InternalTopicManager internalTopicManager = mockInternalTopicManagerForRandomChangelog(nodeSize, topicSize, partitionSize);
final AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(
final AssignmentConfigs configs = new AssignmentConfigs(
0L,
1,
0,
@ -836,7 +836,7 @@ public class StickyTaskAssignorTest {
final Map<UUID, Map<String, Optional<String>>> racksForProcessConsumer = getProcessRacksForAllProcess();
final InternalTopicManager internalTopicManager = mockInternalTopicManagerForChangelog();
final AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(
final AssignmentConfigs configs = new AssignmentConfigs(
0L,
1,
1,
@ -907,7 +907,7 @@ public class StickyTaskAssignorTest {
final int maxCapacity = 3;
final SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = getTaskTopicPartitionMap(
tpSize, partitionSize, false);
final AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(
final AssignmentConfigs configs = new AssignmentConfigs(
0L,
1,
replicaCount,
@ -976,7 +976,7 @@ public class StickyTaskAssignorTest {
final Map<UUID, Map<String, Optional<String>>> processRackMap = getRandomProcessRacks(clientSize, nodeSize);
final InternalTopicManager mockInternalTopicManager = mockInternalTopicManagerForRandomChangelog(nodeSize, tpSize, partitionSize);
AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(
AssignmentConfigs configs = new AssignmentConfigs(
0L,
1,
replicaCount,
@ -1025,7 +1025,7 @@ public class StickyTaskAssignorTest {
}
final SortedMap<UUID, ClientState> clientStateMapCopy = copyClientStateMap(clientStateMap);
configs = new AssignorConfiguration.AssignmentConfigs(
configs = new AssignmentConfigs(
0L,
1,
replicaCount,
@ -1068,7 +1068,7 @@ public class StickyTaskAssignorTest {
private boolean assign(final int numStandbys, final TaskId... tasks) {
final List<TaskId> taskIds = asList(tasks);
Collections.shuffle(taskIds);
final AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs(
final AssignmentConfigs configs = new AssignmentConfigs(
0L,
1,
numStandbys,

View File

@ -33,9 +33,9 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.junit.Before;