KAFKA-15575: Begin enforcing 'tasks.max' property for connectors (#15180)

Reviewers: Ashwin Pankaj <apankaj@confluent.io>, Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Chris Egerton 2024-02-01 11:33:04 -05:00 committed by GitHub
parent 8da6508966
commit 4f0a405908
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 721 additions and 42 deletions

View File

@ -151,7 +151,7 @@
files="LoggingResource.java" />
<suppress checks="ClassDataAbstractionCoupling"
files="(RestServer|AbstractHerder|DistributedHerder|Worker).java"/>
files="(RestServer|AbstractHerder|DistributedHerder|Worker(Test)?).java"/>
<suppress checks="BooleanExpressionComplexity"
files="JsonConverter.java"/>

View File

@ -24,6 +24,7 @@ import org.apache.log4j.spi.LoggingEvent;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseable {
private final List<LoggingEvent> events = new LinkedList<>();
@ -100,6 +101,13 @@ public class LogCaptureAppender extends AppenderSkeleton implements AutoCloseabl
}
}
public List<String> getMessages(String level) {
return getEvents().stream()
.filter(e -> level.equals(e.getLevel()))
.map(Event::getMessage)
.collect(Collectors.toList());
}
public List<String> getMessages() {
final LinkedList<String> result = new LinkedList<>();
synchronized (events) {

View File

@ -115,6 +115,16 @@ public class ConnectorConfig extends AbstractConfig {
private static final String TASK_MAX_DISPLAY = "Tasks max";
public static final String TASKS_MAX_ENFORCE_CONFIG = "tasks.max.enforce";
private static final String TASKS_MAX_ENFORCE_DOC =
"(Deprecated) Whether to enforce that the tasks.max property is respected by the connector. "
+ "By default, connectors that generate too many tasks will fail, and existing sets of tasks that exceed the tasks.max property will also be failed. "
+ "If this property is set to false, then connectors will be allowed to generate more than the maximum number of tasks, and existing sets of "
+ "tasks that exceed the tasks.max property will be allowed to run. "
+ "This property is deprecated and will be removed in an upcoming major release.";
public static final boolean TASKS_MAX_ENFORCE_DEFAULT = true;
private static final String TASKS_MAX_ENFORCE_DISPLAY = "Enforce tasks max";
public static final String TRANSFORMS_CONFIG = "transforms";
private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to records.";
private static final String TRANSFORMS_DISPLAY = "Transforms";
@ -195,6 +205,7 @@ public class ConnectorConfig extends AbstractConfig {
.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY)
.define(TASKS_MAX_ENFORCE_CONFIG, Type.BOOLEAN, TASKS_MAX_ENFORCE_DEFAULT, Importance.LOW, TASKS_MAX_ENFORCE_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASKS_MAX_ENFORCE_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
@ -281,6 +292,14 @@ public class ConnectorConfig extends AbstractConfig {
return getBoolean(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG);
}
public int tasksMax() {
return getInt(TASKS_MAX_CONFIG);
}
public boolean enforceTasksMax() {
return getBoolean(TASKS_MAX_ENFORCE_CONFIG);
}
/**
* Returns the initialized list of {@link TransformationStage} which apply the
* {@link Transformation transformations} and {@link Predicate predicates}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.errors.ConnectException;
/**
* Thrown when a connector has generated too many task configs (i.e., more tasks than
* the value for {@link ConnectorConfig#TASKS_MAX_CONFIG tasks.max} that it has
* been configured with).
*/
public class TooManyTasksException extends ConnectException {
public TooManyTasksException(String connName, int numTasks, int maxTasks) {
super(String.format(
"The connector %s has generated %d tasks, which is greater than %d, "
+ "the maximum number of tasks it is configured to create. "
+ "This behaviour should be considered a bug and is disallowed. "
+ "If necessary, it can be permitted by reconfiguring the connector "
+ "with '%s' set to false; however, this option will be removed in a "
+ "future release of Kafka Connect.",
connName,
numTasks,
maxTasks,
ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG
));
}
}

View File

@ -389,13 +389,21 @@ public class Worker {
if (workerConnector == null)
throw new ConnectException("Connector " + connName + " not found in this worker.");
int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
int maxTasks = connConfig.tasksMax();
Map<String, String> connOriginals = connConfig.originalsStrings();
Connector connector = workerConnector.connector();
try (LoaderSwap loaderSwap = plugins.withClassLoader(workerConnector.loader())) {
String taskClassName = connector.taskClass().getName();
for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
List<Map<String, String>> taskConfigs = connector.taskConfigs(maxTasks);
try {
checkTasksMax(connName, taskConfigs.size(), maxTasks, connConfig.enforceTasksMax());
} catch (TooManyTasksException e) {
// TODO: This control flow is awkward. Push task config generation into WorkerConnector class?
workerConnector.fail(e);
throw e;
}
for (Map<String, String> taskProps : taskConfigs) {
// Ensure we don't modify the connector's copy of the config
Map<String, String> taskConfig = new HashMap<>(taskProps);
taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
@ -413,6 +421,26 @@ public class Worker {
return result;
}
private void checkTasksMax(String connName, int numTasks, int maxTasks, boolean enforce) {
if (numTasks > maxTasks) {
if (enforce) {
throw new TooManyTasksException(connName, numTasks, maxTasks);
} else {
log.warn(
"The connector {} has generated {} tasks, which is greater than {}, "
+ "the maximum number of tasks it is configured to create. "
+ "This behavior should be considered a bug and will be disallowed "
+ "in future releases of Kafka Connect. Please report this to the "
+ "maintainers of the connector and request that they adjust their "
+ "connector's taskConfigs() method to respect the maxTasks parameter.",
connName,
numTasks,
maxTasks
);
}
}
}
/**
* Stop a connector managed by this worker.
*
@ -522,12 +550,12 @@ public class Worker {
/**
* Start a sink task managed by this worker.
*
* @param id the task ID.
* @param configState the most recent {@link ClusterConfigState} known to the worker
* @param connProps the connector properties.
* @param taskProps the tasks properties.
* @param id the task ID.
* @param configState the most recent {@link ClusterConfigState} known to the worker
* @param connProps the connector properties.
* @param taskProps the tasks properties.
* @param statusListener a listener for the runtime status transitions of the task.
* @param initialState the initial state of the connector.
* @param initialState the initial state of the connector.
* @return true if the task started successfully.
*/
public boolean startSinkTask(
@ -538,19 +566,19 @@ public class Worker {
TaskStatus.Listener statusListener,
TargetState initialState
) {
return startTask(id, connProps, taskProps, statusListener,
return startTask(id, connProps, taskProps, configState, statusListener,
new SinkTaskBuilder(id, configState, statusListener, initialState));
}
/**
* Start a source task managed by this worker using older behavior that does not provide exactly-once support.
*
* @param id the task ID.
* @param configState the most recent {@link ClusterConfigState} known to the worker
* @param connProps the connector properties.
* @param taskProps the tasks properties.
* @param id the task ID.
* @param configState the most recent {@link ClusterConfigState} known to the worker
* @param connProps the connector properties.
* @param taskProps the tasks properties.
* @param statusListener a listener for the runtime status transitions of the task.
* @param initialState the initial state of the connector.
* @param initialState the initial state of the connector.
* @return true if the task started successfully.
*/
public boolean startSourceTask(
@ -561,20 +589,20 @@ public class Worker {
TaskStatus.Listener statusListener,
TargetState initialState
) {
return startTask(id, connProps, taskProps, statusListener,
return startTask(id, connProps, taskProps, configState, statusListener,
new SourceTaskBuilder(id, configState, statusListener, initialState));
}
/**
* Start a source task with exactly-once support managed by this worker.
*
* @param id the task ID.
* @param configState the most recent {@link ClusterConfigState} known to the worker
* @param connProps the connector properties.
* @param taskProps the tasks properties.
* @param statusListener a listener for the runtime status transitions of the task.
* @param initialState the initial state of the connector.
* @param preProducerCheck a preflight check that should be performed before the task initializes its transactional producer.
* @param id the task ID.
* @param configState the most recent {@link ClusterConfigState} known to the worker
* @param connProps the connector properties.
* @param taskProps the tasks properties.
* @param statusListener a listener for the runtime status transitions of the task.
* @param initialState the initial state of the connector.
* @param preProducerCheck a preflight check that should be performed before the task initializes its transactional producer.
* @param postProducerCheck a preflight check that should be performed after the task initializes its transactional producer,
* but before producing any source records or offsets.
* @return true if the task started successfully.
@ -589,7 +617,7 @@ public class Worker {
Runnable preProducerCheck,
Runnable postProducerCheck
) {
return startTask(id, connProps, taskProps, statusListener,
return startTask(id, connProps, taskProps, configState, statusListener,
new ExactlyOnceSourceTaskBuilder(id, configState, statusListener, initialState, preProducerCheck, postProducerCheck));
}
@ -599,6 +627,7 @@ public class Worker {
* @param id the task ID.
* @param connProps the connector properties.
* @param taskProps the tasks properties.
* @param configState the most recent {@link ClusterConfigState} known to the worker
* @param statusListener a listener for the runtime status transitions of the task.
* @param taskBuilder the {@link TaskBuilder} used to create the {@link WorkerTask} that manages the lifecycle of the task.
* @return true if the task started successfully.
@ -607,6 +636,7 @@ public class Worker {
ConnectorTaskId id,
Map<String, String> connProps,
Map<String, String> taskProps,
ClusterConfigState configState,
TaskStatus.Listener statusListener,
TaskBuilder<?, ?> taskBuilder
) {
@ -624,6 +654,11 @@ public class Worker {
try (LoaderSwap loaderSwap = plugins.withClassLoader(connectorLoader)) {
final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
int maxTasks = connConfig.tasksMax();
int numTasks = configState.taskCount(id.connector());
checkTasksMax(id.connector(), numTasks, maxTasks, connConfig.enforceTasksMax());
final TaskConfig taskConfig = new TaskConfig(taskProps);
final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
final Task task = plugins.newTask(taskClass);

View File

@ -73,6 +73,7 @@ public class WorkerConnector implements Runnable {
private final AtomicReference<TargetState> pendingTargetStateChange;
private final AtomicReference<Callback<TargetState>> pendingStateChangeCallback;
private final CountDownLatch shutdownLatch;
private volatile Throwable externalFailure;
private volatile boolean stopping; // indicates whether the Worker has asked the connector to stop
private volatile boolean cancelled; // indicates whether the Worker has cancelled the connector (e.g. because of slow shutdown)
@ -102,6 +103,7 @@ public class WorkerConnector implements Runnable {
this.pendingTargetStateChange = new AtomicReference<>();
this.pendingStateChangeCallback = new AtomicReference<>();
this.shutdownLatch = new CountDownLatch(1);
this.externalFailure = null;
this.stopping = false;
this.cancelled = false;
}
@ -131,9 +133,27 @@ public class WorkerConnector implements Runnable {
}
}
/**
* Fail the connector.
* @param cause the cause of the failure; if null, the connector will not be failed
*/
public void fail(Throwable cause) {
synchronized (this) {
if (this.externalFailure != null)
return;
log.error("{} Connector has failed", this, cause);
this.externalFailure = cause;
notify();
}
}
void doRun() {
initialize();
while (!stopping) {
Throwable failure = externalFailure;
if (failure != null)
onFailure(failure);
TargetState newTargetState;
Callback<TargetState> stateChangeCallback;
synchronized (this) {
@ -144,7 +164,10 @@ public class WorkerConnector implements Runnable {
doTransitionTo(newTargetState, stateChangeCallback);
}
synchronized (this) {
if (pendingTargetStateChange.get() != null || stopping) {
if (pendingTargetStateChange.get() != null
|| (!State.FAILED.equals(state) && externalFailure != null)
|| stopping
) {
// An update occurred before we entered the synchronized block; no big deal,
// just start the loop again until we've handled everything
} else {
@ -203,7 +226,11 @@ public class WorkerConnector implements Runnable {
}
}
private void onFailure(Throwable t) {
private synchronized void onFailure(Throwable t) {
// If we've already failed, we don't overwrite the last-reported cause of failure
if (this.state == State.FAILED)
return;
statusListener.onFailure(connName, t);
this.state = State.FAILED;
}

View File

@ -50,6 +50,7 @@ import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TooManyTasksException;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.RestClient;
@ -154,7 +155,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1);
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS = 250;
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS = 60000;
static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS = 60000;
private static final long CONFIG_TOPIC_WRITE_PRIVILEGES_BACKOFF_MS = 250;
private static final int START_STOP_THREAD_POOL_SIZE = 8;
private static final short BACKOFF_RETRIES = 5;
@ -2139,6 +2140,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (error != null) {
if (isPossibleExpiredKeyException(initialRequestTime, error)) {
log.debug("Failed to reconfigure connector's tasks ({}), possibly due to expired session key. Retrying after backoff", connName);
} else if (error instanceof TooManyTasksException) {
log.debug("Connector {} generated too many tasks; will not retry configuring connector", connName);
return;
} else {
log.error("Failed to reconfigure connector's tasks ({}), retrying after backoff.", connName, error);
}

View File

@ -62,6 +62,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
@ -146,6 +147,7 @@ public class BlockingConnectorTest {
connect.stop();
// unblock everything so that we don't leak threads after each test run
Block.reset();
Block.join();
}
@Test
@ -442,11 +444,29 @@ public class BlockingConnectorTest {
resetAwaitBlockLatch();
BLOCK_LATCHES.forEach(CountDownLatch::countDown);
BLOCK_LATCHES.clear();
}
/**
* {@link Thread#join(long millis) Await} the termination of all threads that have been
* intentionally blocked either since the last invocation of this method or, if this method
* has never been invoked, all threads that have ever been blocked.
*/
public static synchronized void join() {
BLOCKED_THREADS.forEach(t -> {
try {
t.join(30_000);
if (t.isAlive()) {
log.warn("Thread {} failed to finish in time", t);
log.warn(
"Thread {} failed to finish in time; current stack trace:\n{}",
t,
Stream.of(t.getStackTrace())
.map(s -> String.format(
"\t%s.%s:%d",
s.getClassName(),
s.getMethodName(),
s.getLineNumber()
)).collect(Collectors.joining("\n"))
);
}
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for blocked thread " + t + " to finish");
@ -777,7 +797,7 @@ public class BlockingConnectorTest {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return IntStream.rangeClosed(0, maxTasks)
return IntStream.range(0, maxTasks)
.mapToObj(i -> new HashMap<>(props))
.collect(Collectors.toList());
}

View File

@ -18,12 +18,17 @@ package org.apache.kafka.connect.integration;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
@ -54,6 +59,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_C
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
@ -881,6 +887,159 @@ public class ConnectWorkerIntegrationTest {
connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS);
}
/**
* Tests the logic around enforcement of the
* {@link org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_CONFIG tasks.max}
* property and how it can be toggled via the
* {@link org.apache.kafka.connect.runtime.ConnectorConfig#TASKS_MAX_ENFORCE_CONFIG tasks.max.enforce}
* property, following the test plain laid out in
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1004%3A+Enforce+tasks.max+property+in+Kafka+Connect#KIP1004:Enforcetasks.maxpropertyinKafkaConnect-TestPlan">KIP-1004</a>.
*/
@Test
public void testTasksMaxEnforcement() throws Exception {
String configTopic = "tasks-max-enforcement-configs";
workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(
NUM_WORKERS,
"Initial group of workers did not start in time."
);
Map<String, String> connectorProps = defaultSourceConnectorProps(TOPIC_NAME);
int maxTasks = 1;
connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
int numTasks = 2;
connectorProps.put(MonitorableSourceConnector.NUM_TASKS, Integer.toString(numTasks));
connect.configureConnector(CONNECTOR_NAME, connectorProps);
// A connector that generates excessive tasks will be failed with an expected error message
connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
CONNECTOR_NAME,
0,
"connector did not fail in time"
);
String expectedErrorSnippet = String.format(
"The connector %s has generated %d tasks, which is greater than %d, "
+ "the maximum number of tasks it is configured to create. ",
CONNECTOR_NAME,
numTasks,
maxTasks
);
String errorMessage = connect.connectorStatus(CONNECTOR_NAME).connector().trace();
assertThat(errorMessage, containsString(expectedErrorSnippet));
// Stop all workers in the cluster
connect.workers().forEach(connect::removeWorker);
// Publish a set of too many task configs to the config topic, to simulate
// an existing set of task configs that was written before the cluster was upgraded
try (JsonConverter converter = new JsonConverter()) {
converter.configure(
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"),
false
);
for (int i = 0; i < numTasks; i++) {
Map<String, String> taskConfig = MonitorableSourceConnector.taskConfig(
connectorProps,
CONNECTOR_NAME,
i
);
Struct wrappedTaskConfig = new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0)
.put("properties", taskConfig);
String key = KafkaConfigBackingStore.TASK_KEY(new ConnectorTaskId(CONNECTOR_NAME, i));
byte[] value = converter.fromConnectData(
configTopic,
KafkaConfigBackingStore.TASK_CONFIGURATION_V0,
wrappedTaskConfig
);
connect.kafka().produce(configTopic, key, new String(value));
}
Struct taskCommitMessage = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0);
taskCommitMessage.put("tasks", numTasks);
String key = KafkaConfigBackingStore.COMMIT_TASKS_KEY(CONNECTOR_NAME);
byte[] value = converter.fromConnectData(
configTopic,
KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0,
taskCommitMessage
);
connect.kafka().produce(configTopic, key, new String(value));
}
// Restart all the workers in the cluster
for (int i = 0; i < NUM_WORKERS; i++)
connect.addWorker();
// An existing set of tasks that exceeds the tasks.max property
// will be failed with an expected error message
connect.assertions().assertConnectorIsFailedAndTasksHaveFailed(
CONNECTOR_NAME,
numTasks,
"connector and tasks did not fail in time"
);
connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, "false");
connect.configureConnector(CONNECTOR_NAME, connectorProps);
// That same existing set of tasks will be allowed to run
// once the connector is reconfigured with tasks.max.enforce set to false
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
numTasks,
"connector and tasks did not start in time"
);
numTasks++;
connectorProps.put(MonitorableSourceConnector.NUM_TASKS, Integer.toString(numTasks));
connect.configureConnector(CONNECTOR_NAME, connectorProps);
// A connector will be allowed to generate excessive tasks when tasks.max.enforce is set to false
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
numTasks,
"connector and tasks did not start in time"
);
numTasks = maxTasks;
connectorProps.put(MonitorableSourceConnector.NUM_TASKS, Integer.toString(numTasks));
connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, "true");
connect.configureConnector(CONNECTOR_NAME, connectorProps);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
numTasks,
"connector and tasks did not start in time"
);
numTasks = maxTasks + 1;
connectorProps.put(MonitorableSourceConnector.NUM_TASKS, Integer.toString(numTasks));
connect.configureConnector(CONNECTOR_NAME, connectorProps);
// A connector that generates excessive tasks after being reconfigured will be failed, but its existing tasks will continue running
connect.assertions().assertConnectorIsFailedAndNumTasksAreRunning(
CONNECTOR_NAME,
maxTasks,
"connector did not fail in time, or tasks were incorrectly failed"
);
// Make sure that the tasks have had a chance to fail (i.e., that the worker has been given
// a chance to check on the number of tasks for the connector during task startup)
for (int i = 0; i < maxTasks; i++)
connect.restartTask(CONNECTOR_NAME, i);
// Verify one more time that none of the tasks have actually failed
connect.assertions().assertConnectorIsFailedAndNumTasksAreRunning(
CONNECTOR_NAME,
maxTasks,
"connector did not fail in time, or tasks were incorrectly failed"
);
}
private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup props for the source connector
Map<String, String> props = new HashMap<>();

View File

@ -51,6 +51,7 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
private static final Logger log = LoggerFactory.getLogger(MonitorableSourceConnector.class);
public static final String TOPIC_CONFIG = "topic";
public static final String NUM_TASKS = "num.tasks";
public static final String MESSAGES_PER_POLL_CONFIG = "messages.per.poll";
public static final String MAX_MESSAGES_PER_SECOND_CONFIG = "throughput";
public static final String MAX_MESSAGES_PRODUCED_CONFIG = "max.messages";
@ -93,16 +94,27 @@ public class MonitorableSourceConnector extends SampleSourceConnector {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
String numTasksProp = commonConfigs.get(NUM_TASKS);
int numTasks = numTasksProp != null ? Integer.parseInt(numTasksProp) : maxTasks;
List<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<>(commonConfigs);
config.put("connector.name", connectorName);
config.put("task.id", taskId(connectorName, i));
for (int i = 0; i < numTasks; i++) {
Map<String, String> config = taskConfig(commonConfigs, connectorName, i);
configs.add(config);
}
return configs;
}
public static Map<String, String> taskConfig(
Map<String, String> connectorProps,
String connectorName,
int taskNum
) {
Map<String, String> result = new HashMap<>(connectorProps);
result.put("connector.name", connectorName);
result.put("task.id", taskId(connectorName, taskNum));
return result;
}
@Override
public void stop() {
log.info("Stopped {} connector {}", this.getClass().getSimpleName(), connectorName);

View File

@ -119,6 +119,8 @@ public class OffsetsApiIntegrationTest {
public static void close() {
// stop all Connect, Kafka and Zk threads.
CONNECT_CLUSTERS.values().forEach(EmbeddedConnectCluster::stop);
// wait for all blocked threads created while testing zombie task scenarios to finish
BlockingConnectorTest.Block.join();
}
private static EmbeddedConnectCluster createOrReuseConnectWithWorkerProps(Map<String, String> workerProps) {
@ -469,6 +471,7 @@ public class OffsetsApiIntegrationTest {
() -> connect.alterConnectorOffsets(connectorName, new ConnectorOffsets(offsetsToAlter)));
assertThat(e.getMessage(), containsString("zombie sink task"));
// clean up blocked threads created while testing zombie task scenarios
BlockingConnectorTest.Block.reset();
}
@ -810,6 +813,7 @@ public class OffsetsApiIntegrationTest {
ConnectRestException e = assertThrows(ConnectRestException.class, () -> connect.resetConnectorOffsets(connectorName));
assertThat(e.getMessage(), containsString("zombie sink task"));
// clean up blocked threads created while testing zombie task scenarios
BlockingConnectorTest.Block.reset();
}

View File

@ -470,8 +470,8 @@ public class AbstractHerderTest {
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
// Base connector config has 14 fields, connector's configs add 7
assertEquals(21, infos.size());
// Base connector config has 15 fields, connector's configs add 7
assertEquals(22, infos.size());
// Missing name should generate an error
assertEquals(ConnectorConfig.NAME_CONFIG,
infos.get(ConnectorConfig.NAME_CONFIG).configValue().name());
@ -582,7 +582,7 @@ public class AbstractHerderTest {
assertEquals(1, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(26, infos.size());
assertEquals(27, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name());
@ -639,7 +639,7 @@ public class AbstractHerderTest {
assertEquals(1, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(28, infos.size());
assertEquals(29, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type", infos.get("transforms.xformA.type").configValue().name());
assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());
@ -700,8 +700,8 @@ public class AbstractHerderTest {
);
assertEquals(expectedGroups, result.groups());
assertEquals(1, result.errorCount());
// Base connector config has 14 fields, connector's configs add 7, and 2 producer overrides
assertEquals(23, result.values().size());
// Base connector config has 15 fields, connector's configs add 7, and 2 producer overrides
assertEquals(24, result.values().size());
assertTrue(result.values().stream().anyMatch(
configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty()));
assertTrue(result.values().stream().anyMatch(

View File

@ -42,6 +42,7 @@ import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@ -130,6 +131,8 @@ import static org.apache.kafka.connect.json.JsonConverterConfig.SCHEMAS_ENABLE_C
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_ENFORCE_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
@ -619,7 +622,21 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds());
worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
Map<String, String> connectorConfigs = anyConnectorConfigMap();
ClusterConfigState configState = new ClusterConfigState(
0,
null,
Collections.singletonMap(CONNECTOR_ID, 1),
Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
Collections.singletonMap(TASK_ID, origProps),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet()
);
assertTrue(worker.startSourceTask(TASK_ID, configState, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED));
assertStatistics(worker, 0, 1);
assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);
@ -662,7 +679,19 @@ public class WorkerTest {
connectorConfigs.put(TOPICS_CONFIG, "t1");
connectorConfigs.put(CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName());
worker.startSinkTask(TASK_ID, ClusterConfigState.EMPTY, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED);
ClusterConfigState configState = new ClusterConfigState(
0,
null,
Collections.singletonMap(CONNECTOR_ID, 1),
Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
Collections.singletonMap(TASK_ID, origProps),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet()
);
assertTrue(worker.startSinkTask(TASK_ID, configState, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED));
assertStatistics(worker, 0, 1);
assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);
@ -718,7 +747,22 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds());
worker.startExactlyOnceSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED, preProducer, postProducer);
Map<String, String> connectorConfigs = anyConnectorConfigMap();
ClusterConfigState configState = new ClusterConfigState(
0,
null,
Collections.singletonMap(CONNECTOR_ID, 1),
Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
Collections.singletonMap(TASK_ID, origProps),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet()
);
assertTrue(worker.startExactlyOnceSourceTask(TASK_ID, configState, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED, preProducer, postProducer));
assertStatistics(worker, 0, 1);
assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);
@ -784,7 +828,7 @@ public class WorkerTest {
ClusterConfigState.EMPTY,
anyConnectorConfigMap(),
origProps,
taskStatusListener,
taskStatusListener,
TargetState.STARTED);
assertStatusMetrics(1L, "connector-running-task-count");
@ -2522,6 +2566,228 @@ public class WorkerTest {
verifyKafkaClusterId();
}
@Test
public void testConnectorGeneratesTooManyTasksButMaxNotEnforced() throws Exception {
testConnectorGeneratesTooManyTasks(false);
}
@Test
public void testConnectorGeneratesTooManyTasksAndMaxEnforced() throws Exception {
testConnectorGeneratesTooManyTasks(true);
}
private void testConnectorGeneratesTooManyTasks(boolean enforced) throws Exception {
mockKafkaClusterId();
String connectorClass = SampleSourceConnector.class.getName();
connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
connectorProps.put(TASKS_MAX_ENFORCE_CONFIG, Boolean.toString(enforced));
mockConnectorIsolation(connectorClass, sourceConnector);
mockExecutorRealSubmit(WorkerConnector.class);
// Use doReturn().when() syntax due to when().thenReturn() not being able to return wildcard generic types
doReturn(SampleSourceConnector.SampleSourceTask.class).when(sourceConnector).taskClass();
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, allConnectorClientConfigOverridePolicy);
worker.start();
FutureCallback<TargetState> onFirstStart = new FutureCallback<>();
worker.startConnector(CONNECTOR_ID, connectorProps, ctx, connectorStatusListener, TargetState.STARTED, onFirstStart);
// Wait for the connector to actually start
assertEquals(TargetState.STARTED, onFirstStart.get(1000, TimeUnit.MILLISECONDS));
Map<String, String> taskConfig = new HashMap<>();
// No warnings or exceptions when a connector generates an empty list of task configs
when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList());
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) {
connectorProps.put(TASKS_MAX_CONFIG, "1");
List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps));
assertEquals(0, taskConfigs.size());
assertTrue(logCaptureAppender.getEvents().stream().noneMatch(e -> e.getLevel().equals("WARN")));
}
// No warnings or exceptions when a connector generates the maximum permitted number of task configs
when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList(taskConfig));
when(sourceConnector.taskConfigs(2)).thenReturn(Arrays.asList(taskConfig, taskConfig));
when(sourceConnector.taskConfigs(3)).thenReturn(Arrays.asList(taskConfig, taskConfig, taskConfig));
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) {
connectorProps.put(TASKS_MAX_CONFIG, "1");
List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps));
assertEquals(1, taskConfigs.size());
connectorProps.put(TASKS_MAX_CONFIG, "2");
taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps));
assertEquals(2, taskConfigs.size());
connectorProps.put(TASKS_MAX_CONFIG, "3");
taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps));
assertEquals(3, taskConfigs.size());
assertEquals(Collections.emptyList(), logCaptureAppender.getMessages("WARN"));
assertEquals(Collections.emptyList(), logCaptureAppender.getMessages("ERROR"));
}
// Warning/exception when a connector generates too many task configs
List<Map<String, String>> tooManyTaskConfigs = Arrays.asList(taskConfig, taskConfig, taskConfig, taskConfig);
when(sourceConnector.taskConfigs(1)).thenReturn(tooManyTaskConfigs);
when(sourceConnector.taskConfigs(2)).thenReturn(tooManyTaskConfigs);
when(sourceConnector.taskConfigs(3)).thenReturn(tooManyTaskConfigs);
for (int i = 0; i < 3; i++) {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) {
int tasksMax = i + 1;
connectorProps.put(TASKS_MAX_CONFIG, Integer.toString(tasksMax));
String tasksMaxExceededMessage;
if (enforced) {
TooManyTasksException e = assertThrows(
TooManyTasksException.class,
() -> worker.connectorTaskConfigs(
CONNECTOR_ID,
new ConnectorConfig(plugins, connectorProps)
)
);
tasksMaxExceededMessage = e.getMessage();
} else {
List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(
CONNECTOR_ID,
new ConnectorConfig(plugins, connectorProps)
);
assertEquals(tooManyTaskConfigs.size(), taskConfigs.size());
List<String> warningMessages = logCaptureAppender.getMessages("WARN");
assertEquals(1, warningMessages.size());
tasksMaxExceededMessage = warningMessages.get(0);
}
assertTasksMaxExceededMessage(
CONNECTOR_ID,
tooManyTaskConfigs.size(), tasksMax,
tasksMaxExceededMessage
);
// Regardless of enforcement, there should never be any error-level log messages
assertEquals(Collections.emptyList(), logCaptureAppender.getMessages("ERROR"));
}
}
// One last sanity check in case the connector is reconfigured and respects tasks.max
when(sourceConnector.taskConfigs(1)).thenReturn(Arrays.asList(taskConfig));
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) {
connectorProps.put(TASKS_MAX_CONFIG, "1");
List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, new ConnectorConfig(plugins, connectorProps));
assertEquals(1, taskConfigs.size());
assertEquals(Collections.emptyList(), logCaptureAppender.getMessages("WARN"));
assertEquals(Collections.emptyList(), logCaptureAppender.getMessages("ERROR"));
}
worker.stop();
}
@Test
public void testStartTaskWithTooManyTaskConfigsButMaxNotEnforced() {
testStartTaskWithTooManyTaskConfigs(false);
}
@Test
public void testStartTaskWithTooManyTaskConfigsAndMaxEnforced() {
testStartTaskWithTooManyTaskConfigs(true);
}
private void testStartTaskWithTooManyTaskConfigs(boolean enforced) {
SinkTask task = mock(TestSinkTask.class);
mockKafkaClusterId();
Map<String, String> origProps = Collections.singletonMap(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService,
noneConnectorClientConfigOverridePolicy, null);
worker.herder = herder;
worker.start();
assertStatistics(worker, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds());
Map<String, String> connectorConfigs = anyConnectorConfigMap();
connectorConfigs.put(TASKS_MAX_ENFORCE_CONFIG, Boolean.toString(enforced));
connectorConfigs.put(TOPICS_CONFIG, "t1");
connectorConfigs.put(CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName());
// The connector is configured to generate at most one task config...
int maxTasks = 1;
connectorConfigs.put(TASKS_MAX_CONFIG, Integer.toString(maxTasks));
String connName = TASK_ID.connector();
int numTasks = 2;
ClusterConfigState configState = new ClusterConfigState(
0,
null,
// ... but it has generated two task configs
Collections.singletonMap(connName, numTasks),
Collections.singletonMap(connName, connectorConfigs),
Collections.singletonMap(connName, TargetState.STARTED),
Collections.singletonMap(TASK_ID, origProps),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet()
);
String tasksMaxExceededMessage;
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(Worker.class)) {
if (enforced) {
assertFalse(worker.startSinkTask(
TASK_ID,
configState,
connectorConfigs,
origProps,
taskStatusListener,
TargetState.STARTED
));
ArgumentCaptor<Throwable> failureCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(taskStatusListener, times(1)).onFailure(eq(TASK_ID), failureCaptor.capture());
assertTrue(
"Expected task start exception to be TooManyTasksException, but was "
+ failureCaptor.getValue().getClass() + " instead",
failureCaptor.getValue() instanceof TooManyTasksException
);
tasksMaxExceededMessage = failureCaptor.getValue().getMessage();
} else {
mockTaskIsolation(SampleSinkConnector.class, TestSinkTask.class, task);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, taskKeyConverter);
mockTaskConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, taskValueConverter);
mockTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
mockExecutorFakeSubmit(WorkerTask.class);
assertTrue(worker.startSinkTask(
TASK_ID,
configState,
connectorConfigs,
origProps,
taskStatusListener,
TargetState.STARTED
));
List<String> warningMessages = logCaptureAppender.getMessages("WARN");
assertEquals(1, warningMessages.size());
tasksMaxExceededMessage = warningMessages.get(0);
}
assertTasksMaxExceededMessage(connName, numTasks, maxTasks, tasksMaxExceededMessage);
}
}
private void assertTasksMaxExceededMessage(String connector, int numTasks, int maxTasks, String message) {
String expectedPrefix = "The connector " + connector
+ " has generated "
+ numTasks + " tasks, which is greater than "
+ maxTasks;
assertTrue(
"Warning/exception message '"
+ message + "' did not start with the expected prefix '"
+ expectedPrefix + "'",
message.startsWith(expectedPrefix)
);
}
private void assertStatusMetrics(long expected, String metricName) {
MetricGroup statusMetrics = worker.connectorStatusMetricsGroup().metricGroup(TASK_ID.connector());
if (expected == 0L) {

View File

@ -40,6 +40,7 @@ import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TooManyTasksException;
import org.apache.kafka.connect.runtime.TopicStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
@ -3267,6 +3268,47 @@ public class DistributedHerderTest {
expectAndVerifyTaskReconfigurationRetries();
}
@Test
public void testTaskReconfigurationNoRetryWithTooManyTasks() {
// initial tick
when(member.memberId()).thenReturn("leader");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
expectConfigRefreshAndSnapshot(SNAPSHOT);
when(worker.isRunning(CONN1)).thenReturn(true);
when(worker.getPlugins()).thenReturn(plugins);
herder.tick();
// No requests are queued, so we shouldn't plan on waking up without external action
// (i.e., rebalance, user request, or shutdown)
// This helps indicate that no retriable operations (such as generating task configs after
// a backoff period) are queued up by the worker
verify(member, times(1)).poll(eq(Long.MAX_VALUE), any());
// Process the task reconfiguration request in this tick
int numTasks = MAX_TASKS + 5;
SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
// Fail to generate tasks because the connector provided too many task configs
when(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
.thenThrow(new TooManyTasksException(CONN1, numTasks, MAX_TASKS));
herder.requestTaskReconfiguration(CONN1);
herder.tick();
// We tried to generate task configs for the connector one time during this tick
verify(worker, times(1)).connectorTaskConfigs(CONN1, sinkConnectorConfig);
// Verifying again that no requests are queued
verify(member, times(2)).poll(eq(Long.MAX_VALUE), any());
verifyNoMoreInteractions(worker);
time.sleep(DistributedHerder.RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS);
herder.tick();
// We ticked one more time, and no further attempt was made to generate task configs
verifyNoMoreInteractions(worker);
// And we don't have any requests queued
verify(member, times(3)).poll(eq(Long.MAX_VALUE), any());
}
@Test
public void testTaskReconfigurationRetriesWithLeaderRequestForwardingException() {
herder = mock(DistributedHerder.class, withSettings().defaultAnswer(CALLS_REAL_METHODS).useConstructor(new DistributedConfig(HERDER_CONFIG),

View File

@ -430,6 +430,29 @@ public class ConnectAssertions {
);
}
/**
* Assert that a connector is in FAILED state, that it has a specific number of tasks, and that all of
* its tasks are in the RUNNING state.
*
* @param connectorName the connector name
* @param numTasks the number of tasks
* @param detailMessage the assertion message
* @throws InterruptedException
*/
public void assertConnectorIsFailedAndNumTasksAreRunning(String connectorName, int numTasks, String detailMessage)
throws InterruptedException {
waitForConnectorState(
connectorName,
AbstractStatus.State.FAILED,
exactly(numTasks),
null,
AbstractStatus.State.RUNNING,
"Either the connector is running or not all the " + numTasks + " tasks are running.",
detailMessage,
CONNECTOR_SETUP_DURATION_MS
);
}
/**
* Assert that a connector does not exist. This can be used to verify that a connector has been successfully deleted.
*

View File

@ -351,6 +351,23 @@ abstract class EmbeddedConnect {
}
}
/**
* Restart an existing task.
*
* @param connName name of the connector
* @param taskNum ID of the task (starting from 0)
* @throws ConnectRestException if the REST API returns error status
* @throws ConnectException for any other error.
*/
public void restartTask(String connName, int taskNum) {
String url = endpointForResource(String.format("connectors/%s/tasks/%d/restart", connName, taskNum));
Response response = requestPost(url, "", Collections.emptyMap());
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) {
throw new ConnectRestException(response.getStatus(),
"Could not execute POST request. Error response: " + responseToString(response));
}
}
/**
* Restart an existing connector and its tasks.
*