MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor (#9384)

Currently, we pass multiple object reference (AdminClient,TaskManager, and a few more) into StreamsPartitionAssignor. Furthermore, we (miss)use TaskManager#mainConsumer() to get access to the main consumer (we need to do this, to avoid a cyclic dependency).

This PR unifies how object references are passed into a single ReferenceContainer class to
 - not "miss use" the TaskManager as reference container
 - unify how object references are passes

Note: we need to use a reference container to avoid cyclic dependencies, instead of using a config for each passed reference individually.

Reviewers: John Roesler <john@confluent.io>
This commit is contained in:
Matthias J. Sax 2020-10-15 16:10:27 -07:00 committed by GitHub
parent a85802faa1
commit e8ad80ebe1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 153 additions and 269 deletions

View File

@ -144,7 +144,7 @@
<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl).java"/>
files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
<suppress checks="MethodLength"
files="KTableImpl.java"/>

View File

@ -900,12 +900,7 @@ public class StreamsConfig extends AbstractConfig {
// These are not settable in the main Streams config; they are set by the StreamThread to pass internal
// state into the assignor.
public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__";
public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__";
public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__";
public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__";
public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__";
public static final String TIME = "__time__";
public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = "__reference.container.instance__";
// This is settable in the main Streams config, but it's a private API for testing
public static final String ASSIGNMENT_LISTENER = "__assignment.listener__";

View File

@ -40,6 +40,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
@ -300,6 +301,11 @@ public class StreamThread extends Thread {
final LogContext logContext = new LogContext(logPrefix);
final Logger log = logContext.logger(StreamThread.class);
final ReferenceContainer referenceContainer = new ReferenceContainer();
referenceContainer.adminClient = adminClient;
referenceContainer.streamsMetadataState = streamsMetadataState;
referenceContainer.time = time;
log.info("Creating restore consumer client");
final Map<String, Object> restoreConsumerConfigs = config.getRestoreConsumerConfigs(getRestoreConsumerClientId(threadId));
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
@ -348,18 +354,12 @@ public class StreamThread extends Thread {
stateDirectory,
StreamThread.processingMode(config)
);
referenceContainer.taskManager = taskManager;
log.info("Creating consumer client");
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, getConsumerClientId(threadId), threadIdx);
consumerConfigs.put(StreamsConfig.InternalConfig.TIME, time);
consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
consumerConfigs.put(StreamsConfig.InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
consumerConfigs.put(StreamsConfig.InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
final AtomicInteger assignmentErrorCode = new AtomicInteger();
consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode);
final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
consumerConfigs.put(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
consumerConfigs.put(StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
final String originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
// If there are any overrides, we never fall through to the consumer, but only handle offset management ourselves.
@ -370,6 +370,7 @@ public class StreamThread extends Thread {
final Consumer<byte[], byte[]> mainConsumer = clientSupplier.getConsumer(consumerConfigs);
changelogReader.setMainConsumer(mainConsumer);
taskManager.setMainConsumer(mainConsumer);
referenceContainer.mainConsumer = mainConsumer;
final StreamThread streamThread = new StreamThread(
time,
@ -384,8 +385,8 @@ public class StreamThread extends Thread {
builder,
threadId,
logContext,
assignmentErrorCode,
nextScheduledRebalanceMs
referenceContainer.assignmentErrorCode,
referenceContainer.nextScheduledRebalanceMs
);
taskManager.setPartitionResetter(partitions -> streamThread.resetOffsets(partitions, null));
@ -1091,8 +1092,4 @@ public class StreamThread extends Thread {
Admin adminClient() {
return adminClient;
}
InternalTopologyBuilder internalTopologyBuilder() {
return builder;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
@ -43,6 +44,7 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
@ -60,6 +62,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
@ -165,6 +168,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private String userEndPoint;
private AssignmentConfigs assignmentConfigs;
// for the main consumer, we need to use a supplier to break a cyclic setup dependency
private Supplier<Consumer<byte[], byte[]>> mainConsumerSupplier;
private Admin adminClient;
private TaskManager taskManager;
private StreamsMetadataState streamsMetadataState;
@SuppressWarnings("deprecation")
@ -175,7 +181,6 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
private Admin adminClient;
private InternalTopicManager internalTopicManager;
private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
private RebalanceProtocol rebalanceProtocol;
@ -196,17 +201,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
logPrefix = assignorConfiguration.logPrefix();
log = new LogContext(logPrefix).logger(getClass());
usedSubscriptionMetadataVersion = assignorConfiguration
.configuredMetadataVersion(usedSubscriptionMetadataVersion);
taskManager = assignorConfiguration.taskManager();
streamsMetadataState = assignorConfiguration.streamsMetadataState();
assignmentErrorCode = assignorConfiguration.assignmentErrorCode();
nextScheduledRebalanceMs = assignorConfiguration.nextScheduledRebalanceMs();
time = assignorConfiguration.time();
usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);
final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();
mainConsumerSupplier = () -> Objects.requireNonNull(referenceContainer.mainConsumer, "Main consumer was not specified");
adminClient = Objects.requireNonNull(referenceContainer.adminClient, "Admin client was not specified");
taskManager = Objects.requireNonNull(referenceContainer.taskManager, "TaskManager was not specified");
streamsMetadataState = Objects.requireNonNull(referenceContainer.streamsMetadataState, "StreamsMetadataState was not specified");
assignmentErrorCode = referenceContainer.assignmentErrorCode;
nextScheduledRebalanceMs = referenceContainer.nextScheduledRebalanceMs;
time = Objects.requireNonNull(referenceContainer.time, "Time was not specified");
assignmentConfigs = assignorConfiguration.assignmentConfigs();
partitionGrouper = assignorConfiguration.partitionGrouper();
userEndPoint = assignorConfiguration.userEndPoint();
adminClient = assignorConfiguration.adminClient();
internalTopicManager = assignorConfiguration.internalTopicManager();
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
@ -806,7 +813,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
fetchEndOffsetsFuture(preexistingChangelogPartitions, adminClient);
final Map<TopicPartition, Long> sourceChangelogEndOffsets =
fetchCommittedOffsets(preexistingSourceChangelogPartitions, taskManager.mainConsumer());
fetchCommittedOffsets(preexistingSourceChangelogPartitions, mainConsumerSupplier.get());
final Map<TopicPartition, ListOffsetsResultInfo> endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture);

View File

@ -120,10 +120,6 @@ public class TaskManager {
this.mainConsumer = mainConsumer;
}
Consumer<byte[], byte[]> mainConsumer() {
return mainConsumer;
}
public UUID processId() {
return processId;
}
@ -204,7 +200,7 @@ public class TaskManager {
// for this task until it has been re-initialized;
// Note, closeDirty already clears the partitiongroup for the task.
if (task.isActive()) {
final Set<TopicPartition> currentAssignment = mainConsumer().assignment();
final Set<TopicPartition> currentAssignment = mainConsumer.assignment();
final Set<TopicPartition> taskInputPartitions = task.inputPartitions();
final Set<TopicPartition> assignedToPauseAndReset =
intersection(HashSet::new, currentAssignment, taskInputPartitions);
@ -217,12 +213,12 @@ public class TaskManager {
);
}
mainConsumer().pause(assignedToPauseAndReset);
final Map<TopicPartition, OffsetAndMetadata> committed = mainConsumer().committed(assignedToPauseAndReset);
mainConsumer.pause(assignedToPauseAndReset);
final Map<TopicPartition, OffsetAndMetadata> committed = mainConsumer.committed(assignedToPauseAndReset);
for (final Map.Entry<TopicPartition, OffsetAndMetadata> committedEntry : committed.entrySet()) {
final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue();
if (offsetAndMetadata != null) {
mainConsumer().seek(committedEntry.getKey(), offsetAndMetadata);
mainConsumer.seek(committedEntry.getKey(), offsetAndMetadata);
assignedToPauseAndReset.remove(committedEntry.getKey());
}
}

View File

@ -17,25 +17,19 @@
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.slf4j.Logger;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
@ -47,8 +41,7 @@ public final class AssignorConfiguration {
private final String logPrefix;
private final Logger log;
private final TaskManager taskManager;
private final Admin adminClient;
private final ReferenceContainer referenceContainer;
private final StreamsConfig streamsConfig;
private final Map<String, ?> internalConfigs;
@ -66,41 +59,22 @@ public final class AssignorConfiguration {
log = logContext.logger(getClass());
{
final Object o = configs.get(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
final Object o = configs.get(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR);
if (o == null) {
final KafkaException fatalException = new KafkaException("TaskManager is not specified");
final KafkaException fatalException = new KafkaException("ReferenceContainer is not specified");
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
if (!(o instanceof TaskManager)) {
if (!(o instanceof ReferenceContainer)) {
final KafkaException fatalException = new KafkaException(
String.format("%s is not an instance of %s", o.getClass().getName(), TaskManager.class.getName())
String.format("%s is not an instance of %s", o.getClass().getName(), ReferenceContainer.class.getName())
);
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
taskManager = (TaskManager) o;
}
{
final Object o = configs.get(StreamsConfig.InternalConfig.STREAMS_ADMIN_CLIENT);
if (o == null) {
final KafkaException fatalException = new KafkaException("Admin is not specified");
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
if (!(o instanceof Admin)) {
final KafkaException fatalException = new KafkaException(
String.format("%s is not an instance of %s", o.getClass().getName(), Admin.class.getName())
);
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
adminClient = (Admin) o;
referenceContainer = (ReferenceContainer) o;
}
{
@ -113,83 +87,8 @@ public final class AssignorConfiguration {
}
}
public AtomicInteger assignmentErrorCode() {
final Object ai = internalConfigs.get(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE);
if (ai == null) {
final KafkaException fatalException = new KafkaException("assignmentErrorCode is not specified");
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
if (!(ai instanceof AtomicInteger)) {
final KafkaException fatalException = new KafkaException(
String.format("%s is not an instance of %s", ai.getClass().getName(), AtomicInteger.class.getName())
);
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
return (AtomicInteger) ai;
}
public AtomicLong nextScheduledRebalanceMs() {
final Object al = internalConfigs.get(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS);
if (al == null) {
final KafkaException fatalException = new KafkaException("nextProbingRebalanceMs is not specified");
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
if (!(al instanceof AtomicLong)) {
final KafkaException fatalException = new KafkaException(
String.format("%s is not an instance of %s", al.getClass().getName(), AtomicLong.class.getName())
);
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
return (AtomicLong) al;
}
public Time time() {
final Object t = internalConfigs.get(InternalConfig.TIME);
if (t == null) {
final KafkaException fatalException = new KafkaException("time is not specified");
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
if (!(t instanceof Time)) {
final KafkaException fatalException = new KafkaException(
String.format("%s is not an instance of %s", t.getClass().getName(), Time.class.getName())
);
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
return (Time) t;
}
public TaskManager taskManager() {
return taskManager;
}
public StreamsMetadataState streamsMetadataState() {
final Object o = internalConfigs.get(StreamsConfig.InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR);
if (o == null) {
final KafkaException fatalException = new KafkaException("StreamsMetadataState is not specified");
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
if (!(o instanceof StreamsMetadataState)) {
final KafkaException fatalException = new KafkaException(
String.format("%s is not an instance of %s", o.getClass().getName(), StreamsMetadataState.class.getName())
);
log.error(fatalException.getMessage(), fatalException);
throw fatalException;
}
return (StreamsMetadataState) o;
public ReferenceContainer referenceContainer() {
return referenceContainer;
}
public RebalanceProtocol rebalanceProtocol() {
@ -291,12 +190,8 @@ public final class AssignorConfiguration {
}
}
public Admin adminClient() {
return adminClient;
}
public InternalTopicManager internalTopicManager() {
return new InternalTopicManager(time(), adminClient, streamsConfig);
return new InternalTopicManager(referenceContainer.time, referenceContainer.adminClient, streamsConfig);
}
public CopartitionedTopicsEnforcer copartitionedTopicsEnforcer() {

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals.assignment;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.TaskManager;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class ReferenceContainer {
public Consumer<byte[], byte[]> mainConsumer;
public Admin adminClient;
public TaskManager taskManager;
public StreamsMetadataState streamsMetadataState;
public final AtomicInteger assignmentErrorCode = new AtomicInteger();
public final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
public Time time;
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.integration;
import java.time.Duration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Partitioner;
@ -65,6 +64,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;

View File

@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
@ -35,7 +36,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
@ -52,8 +53,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
@ -111,21 +110,20 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
private final StreamsMetadataState streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class);
private final Map<String, Subscription> subscriptions = new HashMap<>();
private final AtomicInteger assignmentError = new AtomicInteger();
private final AtomicLong nextProbingRebalanceMs = new AtomicLong(Long.MAX_VALUE);
private ReferenceContainer referenceContainer;
private final MockTime time = new MockTime();
private Map<String, Object> configProps() {
final Map<String, Object> configurationMap = new HashMap<>();
configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
configurationMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
configurationMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
configurationMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
configurationMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentError);
configurationMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextProbingRebalanceMs);
configurationMap.put(InternalConfig.TIME, time);
configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName());
referenceContainer = new ReferenceContainer();
referenceContainer.mainConsumer = EasyMock.mock(Consumer.class);
referenceContainer.adminClient = adminClient;
referenceContainer.taskManager = taskManager;
referenceContainer.streamsMetadataState = streamsMetadataState;
referenceContainer.time = time;
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
return configurationMap;
}
@ -286,7 +284,7 @@ public class HighAvailabilityStreamsPartitionAssignorTest {
assertThat(firstConsumerActiveTasks, equalTo(new ArrayList<>(allTasks)));
assertThat(newConsumerActiveTasks, empty());
assertThat(assignmentError.get(), equalTo(AssignorError.NONE.code()));
assertThat(referenceContainer.assignmentErrorCode.get(), equalTo(AssignorError.NONE.code()));
final long nextScheduledRebalanceOnThisClient =
AssignmentInfo.decode(assignments.get(firstConsumer).userData()).nextRebalanceMs();

View File

@ -64,6 +64,7 @@ import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
@ -555,7 +556,9 @@ public class StreamThreadTest {
}
AtomicLong nextRebalanceMs() {
return (AtomicLong) consumerConfigs.get(StreamsConfig.InternalConfig.NEXT_SCHEDULED_REBALANCE_MS);
return ((ReferenceContainer) consumerConfigs.get(
StreamsConfig.InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR)
).nextScheduledRebalanceMs;
}
}

View File

@ -16,24 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
import static org.easymock.EasyMock.expect;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
@ -49,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.test.IntegrationTest;
@ -62,6 +45,23 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
import static org.easymock.EasyMock.expect;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@Category({IntegrationTest.class})
public class StreamsAssignmentScaleTest {
final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
@ -170,21 +170,19 @@ public class StreamsAssignmentScaleTest {
final Consumer<byte[], byte[]> mainConsumer = EasyMock.createNiceMock(Consumer.class);
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
expect(taskManager.builder()).andStubReturn(builder);
expect(taskManager.mainConsumer()).andStubReturn(mainConsumer);
expect(mainConsumer.committed(new HashSet<>())).andStubReturn(Collections.emptyMap());
final AdminClient adminClient = createMockAdminClientForAssignor(changelogEndOffsets);
final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
final Map<String, Object> configMap = new HashMap<>();
configMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
configMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
configMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
configMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, EasyMock.createNiceMock(StreamsMetadataState.class));
configMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
configMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger());
configMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, new AtomicLong(Long.MAX_VALUE));
configMap.put(InternalConfig.TIME, new MockTime());
final ReferenceContainer referenceContainer = new ReferenceContainer();
referenceContainer.mainConsumer = mainConsumer;
referenceContainer.adminClient = adminClient;
referenceContainer.taskManager = taskManager;
referenceContainer.streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class);
referenceContainer.time = new MockTime();
configMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
configMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
configMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbys);
@ -194,9 +192,10 @@ public class StreamsAssignmentScaleTest {
new MockClientSupplier().restoreConsumer,
false
);
partitionAssignor.configure(configMap);
EasyMock.replay(taskManager, adminClient, mainConsumer);
final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
partitionAssignor.configure(configMap);
partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
final Map<String, Subscription> subscriptions = new HashMap<>();

View File

@ -52,6 +52,7 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignorConfigura
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
@ -77,8 +78,6 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static java.time.Duration.ofMillis;
@ -111,6 +110,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.mock;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
@ -181,20 +181,19 @@ public class StreamsPartitionAssignorTest {
private final Map<String, Subscription> subscriptions = new HashMap<>();
private final Class<? extends TaskAssignor> taskAssignor;
private final AtomicInteger assignmentError = new AtomicInteger();
private final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
private final ReferenceContainer referenceContainer = new ReferenceContainer();
private final MockTime time = new MockTime();
private Map<String, Object> configProps() {
final Map<String, Object> configurationMap = new HashMap<>();
configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
configurationMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
configurationMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
configurationMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
configurationMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentError);
configurationMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, nextScheduledRebalanceMs);
configurationMap.put(InternalConfig.TIME, time);
referenceContainer.mainConsumer = mock(Consumer.class);
referenceContainer.adminClient = adminClient != null ? adminClient : mock(Admin.class);
referenceContainer.taskManager = taskManager;
referenceContainer.streamsMetadataState = streamsMetadataState;
referenceContainer.time = time;
configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
configurationMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
return configurationMap;
}
@ -1341,11 +1340,11 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.onAssignment(createAssignment(oldHostState), null);
assertThat(nextScheduledRebalanceMs.get(), is(0L));
assertThat(referenceContainer.nextScheduledRebalanceMs.get(), is(0L));
partitionAssignor.onAssignment(createAssignment(newHostState), null);
assertThat(nextScheduledRebalanceMs.get(), is(Long.MAX_VALUE));
assertThat(referenceContainer.nextScheduledRebalanceMs.get(), is(Long.MAX_VALUE));
}
@Test
@ -1383,7 +1382,7 @@ public class StreamsPartitionAssignorTest {
partitionAssignor.onAssignment(assignment.get(CONSUMER_2), null);
assertThat(nextScheduledRebalanceMs.get(), is(0L));
assertThat(referenceContainer.nextScheduledRebalanceMs.get(), is(0L));
}
@Test
@ -1433,59 +1432,30 @@ public class StreamsPartitionAssignorTest {
}
@Test
public void shouldThrowKafkaExceptionIfTaskMangerNotConfigured() {
public void shouldThrowKafkaExceptionIfReferenceContainerNotConfigured() {
final Map<String, Object> config = configProps();
config.remove(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR);
config.remove(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR);
try {
partitionAssignor.configure(config);
fail("Should have thrown KafkaException");
} catch (final KafkaException expected) {
assertThat(expected.getMessage(), equalTo("TaskManager is not specified"));
}
final KafkaException expected = assertThrows(
KafkaException.class,
() -> partitionAssignor.configure(config)
);
assertThat(expected.getMessage(), equalTo("ReferenceContainer is not specified"));
}
@Test
public void shouldThrowKafkaExceptionIfTaskMangerConfigIsNotTaskManagerInstance() {
public void shouldThrowKafkaExceptionIfReferenceContainerConfigIsNotTaskManagerInstance() {
final Map<String, Object> config = configProps();
config.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, "i am not a task manager");
config.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, "i am not a reference container");
try {
partitionAssignor.configure(config);
fail("Should have thrown KafkaException");
} catch (final KafkaException expected) {
assertThat(expected.getMessage(),
equalTo("java.lang.String is not an instance of org.apache.kafka.streams.processor.internals.TaskManager"));
}
}
@Test
public void shouldThrowKafkaExceptionAssignmentErrorCodeNotConfigured() {
createDefaultMockTaskManager();
final Map<String, Object> config = configProps();
config.remove(InternalConfig.ASSIGNMENT_ERROR_CODE);
try {
partitionAssignor.configure(config);
fail("Should have thrown KafkaException");
} catch (final KafkaException expected) {
assertThat(expected.getMessage(), equalTo("assignmentErrorCode is not specified"));
}
}
@Test
public void shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicInteger() {
createDefaultMockTaskManager();
final Map<String, Object> config = configProps();
config.put(InternalConfig.ASSIGNMENT_ERROR_CODE, "i am not an AtomicInteger");
try {
partitionAssignor.configure(config);
fail("Should have thrown KafkaException");
} catch (final KafkaException expected) {
assertThat(expected.getMessage(),
equalTo("java.lang.String is not an instance of java.util.concurrent.atomic.AtomicInteger"));
}
final KafkaException expected = assertThrows(
KafkaException.class,
() -> partitionAssignor.configure(config)
);
assertThat(
expected.getMessage(),
equalTo("java.lang.String is not an instance of org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer")
);
}
@Test
@ -1754,17 +1724,6 @@ public class StreamsPartitionAssignorTest {
assertThat(partitionAssignor.probingRebalanceIntervalMs(), equalTo(55 * 60 * 1000L));
}
@Test
public void shouldGetNextProbingRebalanceMs() {
nextScheduledRebalanceMs.set(5 * 60 * 1000L);
createDefaultMockTaskManager();
final Map<String, Object> props = configProps();
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props);
assertThat(assignorConfiguration.nextScheduledRebalanceMs().get(), equalTo(5 * 60 * 1000L));
}
@Test
public void shouldGetTime() {
time.setCurrentTimeMs(Long.MAX_VALUE);
@ -1773,7 +1732,7 @@ public class StreamsPartitionAssignorTest {
final Map<String, Object> props = configProps();
final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props);
assertThat(assignorConfiguration.time().milliseconds(), equalTo(Long.MAX_VALUE));
assertThat(assignorConfiguration.referenceContainer().time.milliseconds(), equalTo(Long.MAX_VALUE));
}
@Test
@ -1916,7 +1875,6 @@ public class StreamsPartitionAssignorTest {
final Consumer<byte[], byte[]> consumerClient = EasyMock.createMock(Consumer.class);
createDefaultMockTaskManager();
EasyMock.expect(taskManager.mainConsumer()).andStubReturn(consumerClient);
configurePartitionAssignorWith(singletonMap(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE));
overwriteInternalTopicManagerWithMock(false);

View File

@ -138,7 +138,7 @@ public class StreamsUpgradeTest {
usedSubscriptionMetadataVersionPeek = new AtomicInteger();
}
configs.remove("test.future.metadata");
nextScheduledRebalanceMs = new AssignorConfiguration(configs).nextScheduledRebalanceMs();
nextScheduledRebalanceMs = new AssignorConfiguration(configs).referenceContainer().nextScheduledRebalanceMs;
super.configure(configs);
}