KAFKA-9228: Restart tasks on runtime-only connector config changes (#16053)

Reviewers: Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Chris Egerton 2024-06-10 23:02:08 +02:00
parent cf67774d8c
commit b38d2eb0ea
No known key found for this signature in database
GPG Key ID: B90BFC8C4393F2F0
15 changed files with 433 additions and 16 deletions

View File

@ -166,7 +166,7 @@
<!-- connect tests-->
<suppress checks="ClassDataAbstractionCoupling"
files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation|WorkerSourceTask)Test.java"/>
files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation|WorkerSourceTask|Worker)Test.java"/>
<suppress checks="ClassFanOutComplexity"
files="(WorkerSink|WorkerSource|ErrorHandling)Task(|WithTopicCreation)Test.java"/>

View File

@ -79,6 +79,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -851,7 +852,8 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
if (rawTaskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, rawTaskProps.size());
result = true;
} else {
}
if (!result) {
for (int index = 0; index < currentNumTasks; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if (!rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) {
@ -860,6 +862,14 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
}
}
}
if (!result) {
Map<String, String> appliedConnectorConfig = configState.appliedConnectorConfig(connName);
Map<String, String> currentConnectorConfig = configState.connectorConfig(connName);
if (!Objects.equals(appliedConnectorConfig, currentConnectorConfig)) {
log.debug("Forcing task restart for connector {} as its configuration appears to be updated", connName);
result = true;
}
}
if (result) {
log.debug("Reconfiguring connector {}: writing new updated configurations for tasks", connName);
} else {

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import java.util.Map;
/**
* Wrapper class for a connector configuration that has been used to generate task configurations
* Supports lazy {@link WorkerConfigTransformer#transform(Map) transformation}.
*/
public class AppliedConnectorConfig {
private final Map<String, String> rawConfig;
private Map<String, String> transformedConfig;
/**
* Create a new applied config that has not yet undergone
* {@link WorkerConfigTransformer#transform(Map) transformation}.
* @param rawConfig the non-transformed connector configuration; may be null
*/
public AppliedConnectorConfig(Map<String, String> rawConfig) {
this.rawConfig = rawConfig;
}
/**
* If necessary, {@link WorkerConfigTransformer#transform(Map) transform} the raw
* connector config, then return the result. Transformed configurations are cached and
* returned in all subsequent calls.
* <p>
* This method is thread-safe: different threads may invoke it at any time and the same
* transformed config should always be returned, with transformation still only ever
* taking place once before its results are cached.
* @param configTransformer the transformer to use, if no transformed connector
* config has been cached yet; may be null
* @return the possibly-cached, transformed, connector config; may be null
*/
public synchronized Map<String, String> transformedConfig(WorkerConfigTransformer configTransformer) {
if (transformedConfig != null || rawConfig == null)
return transformedConfig;
if (configTransformer != null) {
transformedConfig = configTransformer.transform(rawConfig);
} else {
transformedConfig = rawConfig;
}
return transformedConfig;
}
}

View File

@ -43,6 +43,7 @@ public class ClusterConfigState {
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());
@ -55,6 +56,7 @@ public class ClusterConfigState {
final Map<ConnectorTaskId, Map<String, String>> taskConfigs;
final Map<String, Integer> connectorTaskCountRecords;
final Map<String, Integer> connectorTaskConfigGenerations;
final Map<String, AppliedConnectorConfig> appliedConnectorConfigs;
final Set<String> connectorsPendingFencing;
final Set<String> inconsistentConnectors;
@ -66,6 +68,7 @@ public class ClusterConfigState {
Map<ConnectorTaskId, Map<String, String>> taskConfigs,
Map<String, Integer> connectorTaskCountRecords,
Map<String, Integer> connectorTaskConfigGenerations,
Map<String, AppliedConnectorConfig> appliedConnectorConfigs,
Set<String> connectorsPendingFencing,
Set<String> inconsistentConnectors) {
this(offset,
@ -76,6 +79,7 @@ public class ClusterConfigState {
taskConfigs,
connectorTaskCountRecords,
connectorTaskConfigGenerations,
appliedConnectorConfigs,
connectorsPendingFencing,
inconsistentConnectors,
null);
@ -89,6 +93,7 @@ public class ClusterConfigState {
Map<ConnectorTaskId, Map<String, String>> taskConfigs,
Map<String, Integer> connectorTaskCountRecords,
Map<String, Integer> connectorTaskConfigGenerations,
Map<String, AppliedConnectorConfig> appliedConnectorConfigs,
Set<String> connectorsPendingFencing,
Set<String> inconsistentConnectors,
WorkerConfigTransformer configTransformer) {
@ -100,6 +105,7 @@ public class ClusterConfigState {
this.taskConfigs = taskConfigs;
this.connectorTaskCountRecords = connectorTaskCountRecords;
this.connectorTaskConfigGenerations = connectorTaskConfigGenerations;
this.appliedConnectorConfigs = appliedConnectorConfigs;
this.connectorsPendingFencing = connectorsPendingFencing;
this.inconsistentConnectors = inconsistentConnectors;
this.configTransformer = configTransformer;
@ -158,6 +164,19 @@ public class ClusterConfigState {
return connectorConfigs.get(connector);
}
/**
* Get the most recent configuration for the connector from which task configs have
* been generated. The configuration will have been transformed by
* {@link org.apache.kafka.common.config.ConfigTransformer}
* @param connector name of the connector
* @return the connector config, or null if no config exists from which task configs have
* been generated
*/
public Map<String, String> appliedConnectorConfig(String connector) {
AppliedConnectorConfig appliedConfig = appliedConnectorConfigs.get(connector);
return appliedConfig != null ? appliedConfig.transformedConfig(configTransformer) : null;
}
/**
* Get the target state of the connector
* @param connector name of the connector
@ -303,4 +322,5 @@ public class ClusterConfigState {
inconsistentConnectors,
configTransformer);
}
}

View File

@ -321,6 +321,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
final Map<String, Integer> connectorTaskCountRecords = new HashMap<>();
final Map<String, Integer> connectorTaskConfigGenerations = new HashMap<>();
final Map<String, AppliedConnectorConfig> appliedConnectorConfigs = new HashMap<>();
final Set<String> connectorsPendingFencing = new HashSet<>();
private final WorkerConfigTransformer configTransformer;
@ -482,6 +483,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
new HashMap<>(taskConfigs),
new HashMap<>(connectorTaskCountRecords),
new HashMap<>(connectorTaskConfigGenerations),
new HashMap<>(appliedConnectorConfigs),
new HashSet<>(connectorsPendingFencing),
new HashSet<>(inconsistent),
configTransformer
@ -1076,7 +1078,8 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
// but compaction took place and both the original connector config and the
// tombstone message for it have been removed from the config topic
// We should ignore these task configs
if (!connectorConfigs.containsKey(connectorName)) {
Map<String, String> appliedConnectorConfig = connectorConfigs.get(connectorName);
if (appliedConnectorConfig == null) {
processConnectorRemoval(connectorName);
log.debug(
"Ignoring task configs for connector {}; it appears that the connector was deleted previously "
@ -1134,6 +1137,11 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
connectorTaskConfigGenerations.compute(connectorName, (ignored, generation) -> generation != null ? generation + 1 : 0);
}
inconsistent.remove(connectorName);
appliedConnectorConfigs.put(
connectorName,
new AppliedConnectorConfig(appliedConnectorConfig)
);
}
// Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
// update, then we need to see a completely fresh set of configs after this commit message, so we don't
@ -1272,6 +1280,7 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
connectorTaskCounts.remove(connectorName);
taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
deferredTaskUpdates.remove(connectorName);
appliedConnectorConfigs.remove(connectorName);
}
private ConnectorTaskId parseTaskId(String key) {
@ -1344,5 +1353,6 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
else
throw new ConnectException("Expected integer value to be either Integer or Long");
}
}

View File

@ -21,6 +21,8 @@ import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
@ -36,6 +38,8 @@ import java.util.concurrent.TimeUnit;
*/
public class MemoryConfigBackingStore implements ConfigBackingStore {
private static final Logger log = LoggerFactory.getLogger(MemoryConfigBackingStore.class);
private final Map<String, ConnectorState> connectors = new HashMap<>();
private UpdateListener updateListener;
private WorkerConfigTransformer configTransformer;
@ -61,6 +65,7 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
Map<String, TargetState> connectorTargetStates = new HashMap<>();
Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
Map<String, AppliedConnectorConfig> appliedConnectorConfigs = new HashMap<>();
for (Map.Entry<String, ConnectorState> connectorStateEntry : connectors.entrySet()) {
String connector = connectorStateEntry.getKey();
@ -69,6 +74,9 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
connectorConfigs.put(connector, connectorState.connConfig);
connectorTargetStates.put(connector, connectorState.targetState);
taskConfigs.putAll(connectorState.taskConfigs);
if (connectorState.appliedConnConfig != null) {
appliedConnectorConfigs.put(connector, connectorState.appliedConnConfig);
}
}
return new ClusterConfigState(
@ -80,6 +88,7 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
taskConfigs,
Collections.emptyMap(),
Collections.emptyMap(),
appliedConnectorConfigs,
Collections.emptySet(),
Collections.emptySet(),
configTransformer
@ -123,6 +132,7 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
HashSet<ConnectorTaskId> taskIds = new HashSet<>(state.taskConfigs.keySet());
state.taskConfigs.clear();
state.appliedConnConfig = null;
if (updateListener != null)
updateListener.onTaskConfigUpdate(taskIds);
@ -137,6 +147,8 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
Map<ConnectorTaskId, Map<String, String>> taskConfigsMap = taskConfigListAsMap(connector, configs);
state.taskConfigs = taskConfigsMap;
state.applyConfig();
if (updateListener != null)
updateListener.onTaskConfigUpdate(taskConfigsMap.keySet());
}
@ -187,6 +199,7 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
private TargetState targetState;
private Map<String, String> connConfig;
private Map<ConnectorTaskId, Map<String, String>> taskConfigs;
private AppliedConnectorConfig appliedConnConfig;
/**
* @param connConfig the connector's configuration
@ -197,6 +210,11 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
this.targetState = targetState == null ? TargetState.STARTED : targetState;
this.connConfig = connConfig;
this.taskConfigs = new HashMap<>();
this.appliedConnConfig = null;
}
public void applyConfig() {
this.appliedConnConfig = new AppliedConnectorConfig(connConfig);
}
}

View File

@ -17,14 +17,24 @@
package org.apache.kafka.connect.integration;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.provider.FileConfigProvider;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
@ -41,6 +51,7 @@ import org.slf4j.event.Level;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -50,6 +61,8 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
@ -1083,6 +1096,74 @@ public class ConnectWorkerIntegrationTest {
);
}
@Test
public void testRuntimePropertyReconfiguration() throws Exception {
final int offsetCommitIntervalMs = 1_000;
// force fast offset commits
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Integer.toString(offsetCommitIntervalMs));
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(
NUM_WORKERS,
"Initial group of workers did not start in time."
);
final String topic = "kafka9228";
connect.kafka().createTopic(topic, 1);
connect.kafka().produce(topic, "non-json-value");
Map<String, String> connectorConfig = new HashMap<>();
connectorConfig.put(CONNECTOR_CLASS_CONFIG, EmptyTaskConfigsConnector.class.getName());
connectorConfig.put(TASKS_MAX_CONFIG, "1");
connectorConfig.put(TOPICS_CONFIG, topic);
// Initially configure the connector to use the JSON converter, which should cause task failure(s)
connectorConfig.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
connectorConfig.put(
VALUE_CONVERTER_CLASS_CONFIG + "." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
"false"
);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(
CONNECTOR_NAME,
1,
"Connector did not start or task did not fail in time"
);
assertEquals(
"Connector should not have any committed offsets when only task fails on first record",
new ConnectorOffsets(Collections.emptyList()),
connect.connectorOffsets(CONNECTOR_NAME)
);
// Reconfigure the connector to use the string converter, which should not cause any more task failures
connectorConfig.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
connectorConfig.remove(
KEY_CONVERTER_CLASS_CONFIG + "." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG
);
connect.configureConnector(CONNECTOR_NAME, connectorConfig);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
1,
"Connector or tasks did not start in time"
);
Map<String, Object> expectedOffsetKey = new HashMap<>();
expectedOffsetKey.put(SinkUtils.KAFKA_TOPIC_KEY, topic);
expectedOffsetKey.put(SinkUtils.KAFKA_PARTITION_KEY, 0);
Map<String, Object> expectedOffsetValue = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 1);
ConnectorOffset expectedOffset = new ConnectorOffset(expectedOffsetKey, expectedOffsetValue);
ConnectorOffsets expectedOffsets = new ConnectorOffsets(Collections.singletonList(expectedOffset));
// Wait for it to commit offsets, signaling that it has successfully processed the record we produced earlier
waitForCondition(
() -> expectedOffsets.equals(connect.connectorOffsets(CONNECTOR_NAME)),
offsetCommitIntervalMs * 2,
"Task did not successfully process record and/or commit offsets in time"
);
}
private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup props for the source connector
Map<String, String> props = new HashMap<>();
@ -1097,4 +1178,60 @@ public class ConnectWorkerIntegrationTest {
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
return props;
}
public static class EmptyTaskConfigsConnector extends SinkConnector {
@Override
public String version() {
return "0.0";
}
@Override
public void start(Map<String, String> props) {
// no-op
}
@Override
public Class<? extends Task> taskClass() {
return SimpleTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return IntStream.range(0, maxTasks)
.mapToObj(i -> Collections.<String, String>emptyMap())
.collect(Collectors.toList());
}
@Override
public void stop() {
// no-op
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
}
public static class SimpleTask extends SinkTask {
@Override
public String version() {
return "0.0";
}
@Override
public void start(Map<String, String> props) {
// no-op
}
@Override
public void put(Collection<SinkRecord> records) {
// no-op
}
@Override
public void stop() {
// no-op
}
}
}

View File

@ -47,6 +47,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
@ -149,6 +150,7 @@ public class AbstractHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet());
private static final ClusterConfigState SNAPSHOT_NO_TASKS = new ClusterConfigState(
@ -160,6 +162,7 @@ public class AbstractHerderTest {
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet());
@ -1143,6 +1146,44 @@ public class AbstractHerderTest {
verify(snapshot, never()).taskConfig(any());
}
@Test
public void testTaskConfigsChangedWhenAppliedConnectorConfigDiffers() {
assertFalse(AbstractHerder.taskConfigsChanged(SNAPSHOT, CONN1, TASK_CONFIGS));
ClusterConfigState snapshotWithNoAppliedConfig = new ClusterConfigState(
1,
null,
Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet()
);
assertTrue(AbstractHerder.taskConfigsChanged(snapshotWithNoAppliedConfig, CONN1, TASK_CONFIGS));
Map<String, String> appliedConfig = new HashMap<>(CONN1_CONFIG);
String newTopicsProperty = appliedConfig.getOrDefault(SinkConnectorConfig.TOPICS_CONFIG, "foo") + ",newTopic";
appliedConfig.put(SinkConnectorConfig.TOPICS_CONFIG, newTopicsProperty);
ClusterConfigState snapshotWithDifferentAppliedConfig = new ClusterConfigState(
1,
null,
Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(appliedConfig)),
Collections.emptySet(),
Collections.emptySet()
);
assertTrue(AbstractHerder.taskConfigsChanged(snapshotWithDifferentAppliedConfig, CONN1, TASK_CONFIGS));
}
protected void addConfigKey(Map<String, ConfigDef.ConfigKey> keys, String name, String group) {
keys.put(name, new ConfigDef.ConfigKey(name, ConfigDef.Type.STRING, null, null,
ConfigDef.Importance.HIGH, "doc", group, 10,

View File

@ -71,6 +71,7 @@ import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
@ -616,7 +617,23 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds());
worker.startSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
Map<String, String> connectorConfigs = anyConnectorConfigMap();
ClusterConfigState configState = new ClusterConfigState(
0,
null,
Collections.singletonMap(CONNECTOR_ID, 1),
Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
Collections.singletonMap(TASK_ID, origProps),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_ID, new AppliedConnectorConfig(connectorConfigs)),
Collections.emptySet(),
Collections.emptySet()
);
assertTrue(worker.startSourceTask(TASK_ID, configState, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED));
assertStatistics(worker, 0, 1);
assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);
@ -659,7 +676,21 @@ public class WorkerTest {
connectorConfigs.put(TOPICS_CONFIG, "t1");
connectorConfigs.put(CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName());
worker.startSinkTask(TASK_ID, ClusterConfigState.EMPTY, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED);
ClusterConfigState configState = new ClusterConfigState(
0,
null,
Collections.singletonMap(CONNECTOR_ID, 1),
Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
Collections.singletonMap(TASK_ID, origProps),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_ID, new AppliedConnectorConfig(connectorConfigs)),
Collections.emptySet(),
Collections.emptySet()
);
assertTrue(worker.startSinkTask(TASK_ID, configState, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED));
assertStatistics(worker, 0, 1);
assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);
@ -715,7 +746,23 @@ public class WorkerTest {
assertStatistics(worker, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds());
worker.startExactlyOnceSourceTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED, preProducer, postProducer);
Map<String, String> connectorConfigs = anyConnectorConfigMap();
ClusterConfigState configState = new ClusterConfigState(
0,
null,
Collections.singletonMap(CONNECTOR_ID, 1),
Collections.singletonMap(CONNECTOR_ID, connectorConfigs),
Collections.singletonMap(CONNECTOR_ID, TargetState.STARTED),
Collections.singletonMap(TASK_ID, origProps),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_ID, new AppliedConnectorConfig(connectorConfigs)),
Collections.emptySet(),
Collections.emptySet()
);
assertTrue(worker.startExactlyOnceSourceTask(TASK_ID, configState, connectorConfigs, origProps, taskStatusListener, TargetState.STARTED, preProducer, postProducer));
assertStatistics(worker, 0, 1);
assertEquals(Collections.singleton(TASK_ID), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
@ -63,15 +64,22 @@ public class WorkerTestUtils {
public static ClusterConfigState clusterConfigState(long offset,
int connectorNum,
int taskNum) {
Map<String, Map<String, String>> connectorConfigs = connectorConfigs(1, connectorNum);
Map<String, AppliedConnectorConfig> appliedConnectorConfigs = connectorConfigs.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> new AppliedConnectorConfig(e.getValue())
));
return new ClusterConfigState(
offset,
null,
connectorTaskCounts(1, connectorNum, taskNum),
connectorConfigs(1, connectorNum),
connectorConfigs,
connectorTargetStates(1, connectorNum, TargetState.STARTED),
taskConfigs(0, connectorNum, connectorNum * taskNum),
Collections.emptyMap(),
Collections.emptyMap(),
appliedConnectorConfigs,
Collections.emptySet(),
Collections.emptySet());
}

View File

@ -63,6 +63,7 @@ import org.apache.kafka.connect.source.ConnectorTransactionBoundaries;
import org.apache.kafka.connect.source.ExactlyOnceSupport;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
@ -219,6 +220,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet());
private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState(
@ -230,6 +232,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet());
private static final ClusterConfigState SNAPSHOT_STOPPED_CONN1 = new ClusterConfigState(
@ -241,6 +244,7 @@ public class DistributedHerderTest {
Collections.emptyMap(), // Stopped connectors should have an empty set of task configs
Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, 10),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.singleton(CONN1),
Collections.emptySet());
@ -253,6 +257,7 @@ public class DistributedHerderTest {
Collections.emptyMap(),
Collections.singletonMap(CONN1, 0),
Collections.singletonMap(CONN1, 11),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet());
private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(
@ -264,6 +269,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG_UPDATED)),
Collections.emptySet(),
Collections.emptySet());
@ -631,6 +637,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet()
);
@ -1615,6 +1622,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet(),
configTransformer
@ -2219,6 +2227,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet(),
configTransformer);
@ -2362,6 +2371,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet());
expectConfigRefreshAndSnapshot(snapshotWithKey);
@ -2408,6 +2418,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet());
expectConfigRefreshAndSnapshot(snapshotWithKey);
@ -2609,6 +2620,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet());
@ -3094,6 +3106,7 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP,
Collections.emptyMap(),
taskConfigGenerations,
Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)),
Collections.emptySet(),
Collections.emptySet());
@ -3970,6 +3983,15 @@ public class DistributedHerderTest {
Map<String, Map<String, String>> connectorConfigs = connectors.stream()
.collect(Collectors.toMap(Function.identity(), c -> CONN1_CONFIG));
Map<String, AppliedConnectorConfig> appliedConnectorConfigs = taskConfigs.keySet().stream()
.map(ConnectorTaskId::connector)
.distinct()
.filter(connectorConfigs::containsKey)
.collect(Collectors.toMap(
Function.identity(),
connector -> new AppliedConnectorConfig(connectorConfigs.get(connector))
));
return new ClusterConfigState(
1,
sessionKey,
@ -3979,6 +4001,7 @@ public class DistributedHerderTest {
taskConfigs,
taskCountRecords,
taskConfigGenerations,
appliedConnectorConfigs,
pendingFencing,
Collections.emptySet());
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.ConnectorsAndTasks;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.util.ConnectorTaskId;
@ -1396,6 +1397,11 @@ public class IncrementalCooperativeAssignorTest {
Function.identity(),
connectorTaskId -> Collections.emptyMap()
));
Map<String, AppliedConnectorConfig> appliedConnectorConfigs = connectorConfigs.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> new AppliedConnectorConfig(e.getValue())
));
return new ClusterConfigState(
CONFIG_OFFSET,
null,
@ -1405,6 +1411,7 @@ public class IncrementalCooperativeAssignorTest {
taskConfigs,
Collections.emptyMap(),
Collections.emptyMap(),
appliedConnectorConfigs,
Collections.emptySet(),
Collections.emptySet());
}

View File

@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
@ -61,6 +62,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
@ -171,6 +173,7 @@ public class WorkerCoordinatorTest {
Collections.singletonMap(taskId1x0, new HashMap<>()),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet()
);
@ -197,6 +200,7 @@ public class WorkerCoordinatorTest {
configState2TaskConfigs,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet()
);
@ -217,6 +221,11 @@ public class WorkerCoordinatorTest {
configStateSingleTaskConnectorsTaskConfigs.put(taskId1x0, new HashMap<>());
configStateSingleTaskConnectorsTaskConfigs.put(taskId2x0, new HashMap<>());
configStateSingleTaskConnectorsTaskConfigs.put(taskId3x0, new HashMap<>());
Map<String, AppliedConnectorConfig> appliedConnectorConfigs = configStateSingleTaskConnectorsConnectorConfigs.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> new AppliedConnectorConfig(e.getValue())
));
configStateSingleTaskConnectors = new ClusterConfigState(
12L,
null,
@ -226,6 +235,7 @@ public class WorkerCoordinatorTest {
configStateSingleTaskConnectorsTaskConfigs,
Collections.emptyMap(),
Collections.emptyMap(),
appliedConnectorConfigs,
Collections.emptySet(),
Collections.emptySet()
);

View File

@ -42,6 +42,7 @@ import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.runtime.isolation.LoaderSwap;
import org.apache.kafka.connect.runtime.rest.entities.Message;
import org.apache.kafka.connect.storage.AppliedConnectorConfig;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.isolation.Plugins;
@ -421,6 +422,7 @@ public class StandaloneHerderTest {
Collections.singletonMap(taskId, taskConfig(SourceSink.SOURCE)),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)),
new HashSet<>(),
new HashSet<>(),
transformer);
@ -455,6 +457,7 @@ public class StandaloneHerderTest {
Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)),
new HashSet<>(),
new HashSet<>(),
transformer);
@ -603,6 +606,7 @@ public class StandaloneHerderTest {
Collections.singletonMap(taskId, taskConfig(SourceSink.SINK)),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)),
new HashSet<>(),
new HashSet<>(),
transformer);
@ -664,6 +668,7 @@ public class StandaloneHerderTest {
Collections.singletonMap(taskId, taskConfig(SourceSink.SINK)),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)),
new HashSet<>(),
new HashSet<>(),
transformer);
@ -776,7 +781,7 @@ public class StandaloneHerderTest {
connector = mock(BogusSourceConnector.class);
expectAdd(SourceSink.SOURCE);
Connector connectorMock = mock(SourceConnector.class);
expectConfigValidation(connectorMock, true, connConfig);
expectConfigValidation(connectorMock, true, connConfig, newConnConfig);
// Should get first config
doNothing().when(connectorConfigCb).onCompletion(null, connConfig);
@ -788,12 +793,15 @@ public class StandaloneHerderTest {
onStart.getValue().onCompletion(null, TargetState.STARTED);
return true;
}).when(worker).startConnector(eq(CONNECTOR_NAME), capturedConfig.capture(), any(),
eq(herder), eq(TargetState.STARTED), onStart.capture());
// Generate same task config, which should result in no additional action to restart tasks
eq(herder), eq(TargetState.STARTED), onStart.capture());
ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
// Generate same task config, but from different connector config, resulting
// in task restarts
when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true)))
.thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
expectConfigValidation(connectorMock, false, newConnConfig);
.thenReturn(singletonList(taskConfig(SourceSink.SOURCE)));
doNothing().when(worker).stopAndAwaitTasks(Collections.singletonList(taskId));
doNothing().when(statusBackingStore).put(new TaskStatus(taskId, TaskStatus.State.DESTROYED, WORKER_ID, 0));
when(worker.startSourceTask(eq(taskId), any(), eq(newConnConfig), eq(taskConfig(SourceSink.SOURCE)), eq(herder), eq(TargetState.STARTED))).thenReturn(true);
herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback);
Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS);
@ -920,6 +928,8 @@ public class StandaloneHerderTest {
@Test
public void testModifyConnectorOffsetsConnectorNotInStoppedState() {
Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
herder.configState = new ClusterConfigState(
10,
null,
@ -929,6 +939,7 @@ public class StandaloneHerderTest {
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)),
Collections.emptySet(),
Collections.emptySet()
);
@ -955,6 +966,8 @@ public class StandaloneHerderTest {
return null;
}).when(worker).modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), any(Map.class), workerCallbackCapture.capture());
Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
herder.configState = new ClusterConfigState(
10,
null,
@ -964,6 +977,7 @@ public class StandaloneHerderTest {
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)),
Collections.emptySet(),
Collections.emptySet()
);
@ -984,6 +998,8 @@ public class StandaloneHerderTest {
return null;
}).when(worker).modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), isNull(), workerCallbackCapture.capture());
Map<String, String> connectorConfig = connectorConfig(SourceSink.SOURCE);
herder.configState = new ClusterConfigState(
10,
null,
@ -993,6 +1009,7 @@ public class StandaloneHerderTest {
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)),
Collections.emptySet(),
Collections.emptySet()
);
@ -1020,6 +1037,7 @@ public class StandaloneHerderTest {
// And we should instantiate the tasks. For a sink task, we should see added properties for the input topic partitions
Map<String, String> connectorConfig = connectorConfig(sourceSink);
Map<String, String> generatedTaskProps = taskConfig(sourceSink);
when(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
@ -1029,11 +1047,12 @@ public class StandaloneHerderTest {
-1,
null,
Collections.singletonMap(CONNECTOR_NAME, 1),
Collections.singletonMap(CONNECTOR_NAME, connectorConfig(sourceSink)),
Collections.singletonMap(CONNECTOR_NAME, connectorConfig),
Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED),
Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), generatedTaskProps),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)),
new HashSet<>(),
new HashSet<>(),
transformer);

View File

@ -17,4 +17,5 @@ org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector
org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSinkConnector
org.apache.kafka.connect.integration.ErrantRecordSinkConnector
org.apache.kafka.connect.integration.MonitorableSinkConnector
org.apache.kafka.connect.runtime.SampleSinkConnector
org.apache.kafka.connect.runtime.SampleSinkConnector
org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest$EmptyTaskConfigsConnector