diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java index c07baffcddb..6a7ca68a50f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/AssignmentConfigs.java @@ -82,7 +82,7 @@ public class AssignmentConfigs { final long probingRebalanceIntervalMs, final List rackAwareAssignmentTags, final OptionalInt rackAwareTrafficCost, - final OptionalInt rackAwareNonOverlapCost, + final OptionalInt rackAwareNonOverlapCost, final String rackAwareAssignmentStrategy) { this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag); this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas); @@ -90,16 +90,36 @@ public class AssignmentConfigs { this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs); this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags); this.rackAwareTrafficCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, - rackAwareTrafficCost + defaultRackAwareTrafficCost(rackAwareTrafficCost) ); this.rackAwareNonOverlapCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, - rackAwareNonOverlapCost + defaultRackAwareNonOverlapCost(rackAwareNonOverlapCost) ); this.rackAwareAssignmentStrategy = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareAssignmentStrategy ); } + public AssignmentConfigs(final long acceptableRecoveryLag, + final int maxWarmupReplicas, + final int numStandbyReplicas, + final long probingRebalanceIntervalMs, + final List rackAwareAssignmentTags, + final int rackAwareTrafficCost, + final int rackAwareNonOverlapCost, + final String rackAwareAssignmentStrategy) { + this(acceptableRecoveryLag, maxWarmupReplicas, numStandbyReplicas, probingRebalanceIntervalMs, rackAwareAssignmentTags, + OptionalInt.of(rackAwareTrafficCost), OptionalInt.of(rackAwareNonOverlapCost), rackAwareAssignmentStrategy); + } + + public AssignmentConfigs(final Long acceptableRecoveryLag, + final Integer maxWarmupReplicas, + final Integer numStandbyReplicas, + final Long probingRebalanceIntervalMs, + final List rackAwareAssignmentTags) { + this(acceptableRecoveryLag, maxWarmupReplicas, numStandbyReplicas, probingRebalanceIntervalMs, rackAwareAssignmentTags, + OptionalInt.empty(), OptionalInt.empty(), StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE); + } /** * The configured acceptable recovery lag according to @@ -186,4 +206,18 @@ public class AssignmentConfigs { "\n rackAwareAssignmentStrategy=" + rackAwareAssignmentStrategy + "\n}"; } + + private static OptionalInt defaultRackAwareTrafficCost(final OptionalInt rackAwareTrafficCost) { + if (rackAwareTrafficCost == null) { + return OptionalInt.empty(); + } + return rackAwareTrafficCost; + } + + private static OptionalInt defaultRackAwareNonOverlapCost(final OptionalInt rackAwareNonOverlapCost) { + if (rackAwareNonOverlapCost == null) { + return OptionalInt.empty(); + } + return rackAwareNonOverlapCost; + } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 53354b3844d..1f5a5bb05cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -43,6 +43,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ApplicationState; import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.assignment.TaskAssignor.AssignmentError; import org.apache.kafka.streams.processor.assignment.TaskInfo; import org.apache.kafka.streams.processor.assignment.ProcessId; @@ -53,7 +54,6 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.Topi import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.assignment.ClientState; @@ -206,7 +206,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf private String userEndPoint; private AssignmentConfigs assignmentConfigs; - private org.apache.kafka.streams.processor.assignment.AssignmentConfigs publicAssignmentConfigs; // for the main consumer, we need to use a supplier to break a cyclic setup dependency private Supplier> mainConsumerSupplier; @@ -257,7 +256,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf nonFatalExceptionsToHandle = referenceContainer.nonFatalExceptionsToHandle; time = Objects.requireNonNull(referenceContainer.time, "Time was not specified"); assignmentConfigs = assignorConfiguration.assignmentConfigs(); - publicAssignmentConfigs = assignorConfiguration.publicAssignmentConfigs(); partitionGrouper = new PartitionGrouper(); userEndPoint = assignorConfiguration.userEndPoint(); internalTopicManager = assignorConfiguration.internalTopicManager(); @@ -582,7 +580,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf )); return new DefaultApplicationState( - publicAssignmentConfigs, + assignmentConfigs, logicalTasks, clientMetadataMap ); @@ -1760,19 +1758,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } long acceptableRecoveryLag() { - return assignmentConfigs.acceptableRecoveryLag; + return assignmentConfigs.acceptableRecoveryLag(); } int maxWarmupReplicas() { - return assignmentConfigs.maxWarmupReplicas; + return assignmentConfigs.maxWarmupReplicas(); } int numStandbyReplicas() { - return assignmentConfigs.numStandbyReplicas; + return assignmentConfigs.numStandbyReplicas(); } long probingRebalanceIntervalMs() { - return assignmentConfigs.probingRebalanceIntervalMs; + return assignmentConfigs.probingRebalanceIntervalMs(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index 355f8736b3b..7004ede98b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -20,18 +20,17 @@ import java.util.Optional; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.internals.UpgradeFromValues; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.ClientUtils; import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.slf4j.Logger; -import java.util.List; import java.util.Map; import static org.apache.kafka.common.utils.Utils.getHost; @@ -240,11 +239,7 @@ public final class AssignorConfiguration { } public AssignmentConfigs assignmentConfigs() { - return new AssignmentConfigs(streamsConfig); - } - - public org.apache.kafka.streams.processor.assignment.AssignmentConfigs publicAssignmentConfigs() { - return org.apache.kafka.streams.processor.assignment.AssignmentConfigs.of(streamsConfig); + return AssignmentConfigs.of(streamsConfig); } public TaskAssignor taskAssignor() { @@ -297,72 +292,4 @@ public final class AssignorConfiguration { public interface AssignmentListener { void onAssignmentComplete(final boolean stable); } - - public static class AssignmentConfigs { - public final long acceptableRecoveryLag; - public final int maxWarmupReplicas; - public final int numStandbyReplicas; - public final long probingRebalanceIntervalMs; - public final List rackAwareAssignmentTags; - public final Integer rackAwareAssignmentTrafficCost; - public final Integer rackAwareAssignmentNonOverlapCost; - public final String rackAwareAssignmentStrategy; - - private AssignmentConfigs(final StreamsConfig configs) { - acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); - maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG); - numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); - probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); - rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); - rackAwareAssignmentTrafficCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); - rackAwareAssignmentNonOverlapCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); - rackAwareAssignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); - } - - AssignmentConfigs(final Long acceptableRecoveryLag, - final Integer maxWarmupReplicas, - final Integer numStandbyReplicas, - final Long probingRebalanceIntervalMs, - final List rackAwareAssignmentTags) { - this(acceptableRecoveryLag, maxWarmupReplicas, numStandbyReplicas, probingRebalanceIntervalMs, rackAwareAssignmentTags, - null, null, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE); - } - - AssignmentConfigs(final Long acceptableRecoveryLag, - final Integer maxWarmupReplicas, - final Integer numStandbyReplicas, - final Long probingRebalanceIntervalMs, - final List rackAwareAssignmentTags, - final Integer rackAwareAssignmentTrafficCost, - final Integer rackAwareAssignmentNonOverlapCost, - final String rackAwareAssignmentStrategy) { - this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag); - this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas); - this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas); - this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs); - this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags); - this.rackAwareAssignmentTrafficCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, rackAwareAssignmentTrafficCost); - this.rackAwareAssignmentNonOverlapCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, rackAwareAssignmentNonOverlapCost); - this.rackAwareAssignmentStrategy = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareAssignmentStrategy); - } - - private static T validated(final String configKey, final T value) { - final ConfigDef.Validator validator = StreamsConfig.configDef().configKeys().get(configKey).validator; - if (validator != null) { - validator.ensureValid(configKey, value); - } - return value; - } - - @Override - public String toString() { - return "AssignmentConfigs{" + - "\n acceptableRecoveryLag=" + acceptableRecoveryLag + - "\n maxWarmupReplicas=" + maxWarmupReplicas + - "\n numStandbyReplicas=" + numStandbyReplicas + - "\n probingRebalanceIntervalMs=" + probingRebalanceIntervalMs + - "\n rackAwareAssignmentTags=" + rackAwareAssignmentTags + - "\n}"; - } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java index 07cf73304ec..218e50f32e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java @@ -23,7 +23,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +52,9 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { private final Function> tagsFunction; public ClientTagAwareStandbyTaskAssignor() { - this((uuid, clientState) -> clientState.clientTags(), assignmentConfigs -> assignmentConfigs.rackAwareAssignmentTags); + this((uuid, clientState) -> clientState.clientTags(), + AssignmentConfigs::rackAwareAssignmentTags + ); } public ClientTagAwareStandbyTaskAssignor(final BiFunction> clientTagFunction, @@ -63,8 +65,8 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { /** * The algorithm distributes standby tasks for the {@param statefulTaskIds} over different tag dimensions. - * For each stateful task, the number of standby tasks will be assigned based on configured {@link AssignmentConfigs#numStandbyReplicas}. - * Rack aware standby tasks distribution only takes into account tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags}. + * For each stateful task, the number of standby tasks will be assigned based on configured {@link AssignmentConfigs#numStandbyReplicas()}. + * Rack aware standby tasks distribution only takes into account tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags()}. * Ideally, all standby tasks for any given stateful task will be located on different tag dimensions to have the best possible distribution. * However, if the ideal (or partially ideal) distribution is impossible, the algorithm will fall back to the least-loaded clients without taking rack awareness constraints into consideration. * The least-loaded clients are determined based on the total number of tasks (active and standby tasks) assigned to the client. @@ -73,8 +75,8 @@ class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { public boolean assign(final Map clients, final Set allTaskIds, final Set statefulTaskIds, - final AssignorConfiguration.AssignmentConfigs configs) { - final int numStandbyReplicas = configs.numStandbyReplicas; + final AssignmentConfigs configs) { + final int numStandbyReplicas = configs.numStandbyReplicas(); final Set rackAwareAssignmentTags = new HashSet<>(tagsFunction.apply(configs)); final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStandbyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStandbyTaskAssignor.java index 680a056a826..76286762249 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStandbyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultStandbyTaskAssignor.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,8 +41,8 @@ class DefaultStandbyTaskAssignor implements StandbyTaskAssignor { public boolean assign(final Map clients, final Set allTaskIds, final Set statefulTaskIds, - final AssignorConfiguration.AssignmentConfigs configs) { - final int numStandbyReplicas = configs.numStandbyReplicas; + final AssignmentConfigs configs) { + final int numStandbyReplicas = configs.numStandbyReplicas(); final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys(numStandbyReplicas, statefulTaskIds); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java index 8e261eb296b..fa63e0bcb64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java @@ -17,11 +17,11 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; /** * A special task assignor implementation to be used as a fallback in case the diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java index c0f65de4040..541d02e4500 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.Task; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,12 +67,12 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor { configs ); - final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas); + final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas()); final Map> tasksToCaughtUpClients = tasksToCaughtUpClients( statefulTasks, clientStates, - configs.acceptableRecoveryLag + configs.acceptableRecoveryLag() ); final Map> tasksToClientByLag = tasksToClientByLag(statefulTasks, clientStates); @@ -134,10 +134,8 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor { ); if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) { - final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ? - DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost; - final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ? - DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost; + final int trafficCost = configs.rackAwareTrafficCost().orElse(DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST); + final int nonOverlapCost = configs.rackAwareNonOverlapCost().orElse(DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST); rackAwareTaskAssignor.optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost); } } @@ -147,7 +145,7 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor { final Set statefulTasks, final RackAwareTaskAssignor rackAwareTaskAssignor, final AssignmentConfigs configs) { - if (configs.numStandbyReplicas == 0) { + if (configs.numStandbyReplicas() == 0) { return; } @@ -164,10 +162,8 @@ public class HighAvailabilityTaskAssignor implements TaskAssignor { ); if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) { - final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ? - DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost; - final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ? - DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost; + final int trafficCost = configs.rackAwareTrafficCost().orElse(DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST); + final int nonOverlapCost = configs.rackAwareNonOverlapCost().orElse(DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST); rackAwareTaskAssignor.optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, standbyTaskAssignor::isAllowedTaskMovement); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java index 0ae391fc291..ac6d4508602 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactory.java @@ -25,14 +25,14 @@ import java.util.Set; import java.util.TreeMap; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; public class RackAwareGraphConstructorFactory { static RackAwareGraphConstructor create(final AssignmentConfigs assignmentConfigs, final Map> tasksForTopicGroup) { - return create(assignmentConfigs.rackAwareAssignmentStrategy, new ArrayList<>(new TreeMap<>(tasksForTopicGroup).values())); + return create(assignmentConfigs.rackAwareAssignmentStrategy(), new ArrayList<>(new TreeMap<>(tasksForTopicGroup).values())); } public static RackAwareGraphConstructor create(final String rackAwareAssignmentStrategy, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java index 4b430fbb2c1..e7931fa3c3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java @@ -43,9 +43,9 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,14 +118,14 @@ public class RackAwareTaskAssignor { } public synchronized boolean canEnableRackAwareAssignor() { - if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) { + if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy())) { return false; } if (canEnable != null) { return canEnable; } canEnable = validClientRack && validateTopicPartitionRack(false); - if (assignmentConfigs.numStandbyReplicas == 0 || !canEnable) { + if (assignmentConfigs.numStandbyReplicas() == 0 || !canEnable) { return canEnable; } @@ -240,7 +240,7 @@ public class RackAwareTaskAssignor { KeyValue previousRackInfo = null; for (final Map.Entry> rackEntry : entry.getValue().entrySet()) { if (!rackEntry.getValue().isPresent()) { - if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) { + if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy())) { log.error( String.format("RackId doesn't exist for process %s and consumer %s", processId, rackEntry.getKey())); @@ -263,7 +263,7 @@ public class RackAwareTaskAssignor { } } if (previousRackInfo == null) { - if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) { + if (!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy())) { log.error(String.format("RackId doesn't exist for process %s", processId)); } return false; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java index fe28d94e0e1..09f634e96da 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignor.java @@ -20,7 +20,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; interface StandbyTaskAssignor extends TaskAssignor { default boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { @@ -53,5 +53,5 @@ interface StandbyTaskAssignor extends TaskAssignor { boolean assign(final Map clients, final Set allTaskIds, final Set statefulTaskIds, - final AssignorConfiguration.AssignmentConfigs configs); + final AssignmentConfigs configs); } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactory.java index 4d47a843042..c6a436e72cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactory.java @@ -22,13 +22,14 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import java.util.Collections; import java.util.Map; import java.util.UUID; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; class StandbyTaskAssignorFactory { private StandbyTaskAssignorFactory() {} - static StandbyTaskAssignor create(final AssignorConfiguration.AssignmentConfigs configs, + static StandbyTaskAssignor create(final AssignmentConfigs configs, final RackAwareTaskAssignor rackAwareTaskAssignor) { - if (!configs.rackAwareAssignmentTags.isEmpty()) { + if (!configs.rackAwareAssignmentTags().isEmpty()) { return new ClientTagAwareStandbyTaskAssignor(); } else if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.validClientRack()) { // racksForProcess should be populated if rackAwareTaskAssignor isn't null diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index a97bb319ed2..481011f6e1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -24,7 +24,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,17 +85,15 @@ public class StickyTaskAssignor implements TaskAssignor { assignActive(); optimizeActive(); - assignStandby(configs.numStandbyReplicas); + assignStandby(configs.numStandbyReplicas()); optimizeStandby(); return false; } private void optimizeStandby() { - if (configs.numStandbyReplicas > 0 && rackAwareTaskAssignor != null && rackAwareTaskAssignor.canEnableRackAwareAssignor()) { - final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ? - DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost; - final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ? - DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost; + if (configs.numStandbyReplicas() > 0 && rackAwareTaskAssignor != null && rackAwareTaskAssignor.canEnableRackAwareAssignor()) { + final int trafficCost = configs.rackAwareTrafficCost().orElse(DEFAULT_STATEFUL_TRAFFIC_COST); + final int nonOverlapCost = configs.rackAwareNonOverlapCost().orElse(DEFAULT_STATEFUL_NON_OVERLAP_COST); final TreeMap clientStates = new TreeMap<>(clients); rackAwareTaskAssignor.optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, (s, d, t, c) -> true); @@ -104,10 +102,8 @@ public class StickyTaskAssignor implements TaskAssignor { private void optimizeActive() { if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.canEnableRackAwareAssignor()) { - final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ? - DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost; - final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ? - DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost; + final int trafficCost = configs.rackAwareTrafficCost().orElse(DEFAULT_STATEFUL_TRAFFIC_COST); + final int nonOverlapCost = configs.rackAwareNonOverlapCost().orElse(DEFAULT_STATEFUL_NON_OVERLAP_COST); final SortedSet statefulTasks = new TreeSet<>(statefulTaskIds); final TreeMap clientStates = new TreeMap<>(clients); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java index c1829efa18f..b86481e66d6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignor.java @@ -21,7 +21,7 @@ import org.apache.kafka.streams.processor.TaskId; import java.util.Map; import java.util.Set; import java.util.UUID; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; public interface TaskAssignor { /** diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java index 82a95e3a990..9d7d298f831 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java @@ -25,9 +25,9 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener; import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor; import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor; @@ -147,7 +147,7 @@ public class TaskAssignorIntegrationTest { final Field assignmentConfigs = StreamsPartitionAssignor.class.getDeclaredField("assignmentConfigs"); assignmentConfigs.setAccessible(true); - final AssignorConfiguration.AssignmentConfigs configs = (AssignorConfiguration.AssignmentConfigs) assignmentConfigs.get(streamsPartitionAssignor); + final AssignmentConfigs configs = (AssignmentConfigs) assignmentConfigs.get(streamsPartitionAssignor); final Field assignmentListenerField = StreamsPartitionAssignor.class.getDeclaredField("assignmentListener"); assignmentListenerField.setAccessible(true); @@ -159,10 +159,10 @@ public class TaskAssignorIntegrationTest { (Supplier) taskAssignorSupplierField.get(streamsPartitionAssignor); final TaskAssignor taskAssignor = taskAssignorSupplier.get(); - assertThat(configs.numStandbyReplicas, is(5)); - assertThat(configs.acceptableRecoveryLag, is(6L)); - assertThat(configs.maxWarmupReplicas, is(7)); - assertThat(configs.probingRebalanceIntervalMs, is(480000L)); + assertThat(configs.numStandbyReplicas(), is(5)); + assertThat(configs.acceptableRecoveryLag(), is(6L)); + assertThat(configs.maxWarmupReplicas(), is(7)); + assertThat(configs.probingRebalanceIntervalMs(), is(480000L)); assertThat(actualAssignmentListener, sameInstance(configuredAssignmentListener)); assertThat(taskAssignor, instanceOf(MyTaskAssignor.class)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index 1241ef03fcf..1c23ad789be 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -39,11 +39,11 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockInternalTopicManager; import org.hamcrest.BaseMatcher; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java index 37394e647fb..53c1504a201 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.internals.UpgradeFromValues; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.junit.Before; import org.junit.Test; @@ -45,7 +46,7 @@ public class AssignorConfigurationTest { public void configsShouldRejectZeroWarmups() { final ConfigException exception = assertThrows( ConfigException.class, - () -> new AssignorConfiguration.AssignmentConfigs(1L, 0, 1, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) + () -> new AssignmentConfigs(1L, 0, 1, 1L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(exception.getMessage(), containsString("Invalid value 0 for configuration max.warmup.replicas")); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java index f5bf95ac535..9d95ba767ea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.assignment.ClientTagAwareStandbyTaskAssignor.TagEntry; import org.junit.Before; import org.junit.Test; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java index 26ff4685284..5911e13c032 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignorTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals.assignment; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.junit.Test; import java.util.Collections; @@ -55,7 +56,7 @@ public class FallbackPriorTaskAssignorTest { new HashSet<>(taskIds), new HashSet<>(taskIds), null, - new AssignorConfiguration.AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) + new AssignmentConfigs(0L, 1, 0, 60_000L, EMPTY_RACK_AWARE_ASSIGNMENT_TAGS) ); assertThat(probingRebalanceNeeded, is(true)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java index 850f4715b54..c0393f08d09 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java @@ -27,9 +27,9 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Test; @@ -1459,7 +1459,7 @@ public class HighAvailabilityTaskAssignorTest { final Map>> processRackMap = getRandomProcessRacks(clientSize, nodeSize); final InternalTopicManager mockInternalTopicManager = mockInternalTopicManagerForRandomChangelog(nodeSize, tpSize, partitionSize); - AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs( + AssignmentConfigs configs = new AssignmentConfigs( 0L, 1, replicaCount, @@ -1508,7 +1508,7 @@ public class HighAvailabilityTaskAssignorTest { } final SortedMap clientStateMapCopy = copyClientStateMap(clientStateMap); - configs = new AssignorConfiguration.AssignmentConfigs( + configs = new AssignmentConfigs( 0L, 1, replicaCount, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactoryTest.java index a4c17ea3e3c..60505a8e2ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareGraphConstructorFactoryTest.java @@ -23,7 +23,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.junit.jupiter.api.Test; public class RackAwareGraphConstructorFactoryTest { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java index 22c77177181..c4543c974d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java @@ -108,8 +108,8 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockInternalTopicManager; import org.junit.Before; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java index ef3f0fbf4c2..8a0f6d1bcfa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StandbyTaskAssignorFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals.assignment; import java.util.Arrays; import java.util.Collection; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.junit.Before; import org.junit.Test; @@ -103,11 +104,11 @@ public class StandbyTaskAssignorFactoryTest { } } - private static AssignorConfiguration.AssignmentConfigs newAssignmentConfigs(final List rackAwareAssignmentTags) { - return new AssignorConfiguration.AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG, - MAX_WARMUP_REPLICAS, - NUMBER_OF_STANDBY_REPLICAS, - PROBING_REBALANCE_INTERVAL_MS, - rackAwareAssignmentTags); + private static AssignmentConfigs newAssignmentConfigs(final List rackAwareAssignmentTags) { + return new AssignmentConfigs(ACCEPTABLE_RECOVERY_LAG, + MAX_WARMUP_REPLICAS, + NUMBER_OF_STANDBY_REPLICAS, + PROBING_REBALANCE_INTERVAL_MS, + rackAwareAssignmentTags); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 9a3c7ddc199..65b5ee84a90 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -26,9 +26,9 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.junit.Before; import org.junit.Test; @@ -463,7 +463,7 @@ public class StickyTaskAssignorTest { final Map> changelogPartitionsForTask = getTaskTopicPartitionMap(topicSize, partitionSize, true); final Map>> racksForProcessConsumer = getRandomProcessRacks(clientSize, nodeSize); final InternalTopicManager internalTopicManager = mockInternalTopicManagerForRandomChangelog(nodeSize, topicSize, partitionSize); - final AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs( + final AssignmentConfigs configs = new AssignmentConfigs( 0L, 1, 1, @@ -786,7 +786,7 @@ public class StickyTaskAssignorTest { final Map>> racksForProcessConsumer = getRandomProcessRacks(clientSize, nodeSize); final InternalTopicManager internalTopicManager = mockInternalTopicManagerForRandomChangelog(nodeSize, topicSize, partitionSize); - final AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs( + final AssignmentConfigs configs = new AssignmentConfigs( 0L, 1, 0, @@ -836,7 +836,7 @@ public class StickyTaskAssignorTest { final Map>> racksForProcessConsumer = getProcessRacksForAllProcess(); final InternalTopicManager internalTopicManager = mockInternalTopicManagerForChangelog(); - final AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs( + final AssignmentConfigs configs = new AssignmentConfigs( 0L, 1, 1, @@ -907,7 +907,7 @@ public class StickyTaskAssignorTest { final int maxCapacity = 3; final SortedMap> taskTopicPartitionMap = getTaskTopicPartitionMap( tpSize, partitionSize, false); - final AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs( + final AssignmentConfigs configs = new AssignmentConfigs( 0L, 1, replicaCount, @@ -976,7 +976,7 @@ public class StickyTaskAssignorTest { final Map>> processRackMap = getRandomProcessRacks(clientSize, nodeSize); final InternalTopicManager mockInternalTopicManager = mockInternalTopicManagerForRandomChangelog(nodeSize, tpSize, partitionSize); - AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs( + AssignmentConfigs configs = new AssignmentConfigs( 0L, 1, replicaCount, @@ -1025,7 +1025,7 @@ public class StickyTaskAssignorTest { } final SortedMap clientStateMapCopy = copyClientStateMap(clientStateMap); - configs = new AssignorConfiguration.AssignmentConfigs( + configs = new AssignmentConfigs( 0L, 1, replicaCount, @@ -1068,7 +1068,7 @@ public class StickyTaskAssignorTest { private boolean assign(final int numStandbys, final TaskId... tasks) { final List taskIds = asList(tasks); Collections.shuffle(taskIds); - final AssignmentConfigs configs = new AssignorConfiguration.AssignmentConfigs( + final AssignmentConfigs configs = new AssignmentConfigs( 0L, 1, numStandbys, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java index 930c8e8be39..8b791953f30 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java @@ -33,9 +33,9 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.AssignmentConfigs; import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockInternalTopicManager; import org.junit.Before;