KAFKA-15470: Allow creating connectors in a stopped state (#14704)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Yash Mayya 2023-11-15 06:07:50 +00:00 committed by GitHub
parent 83b7c9a053
commit 1bc4de75a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 929 additions and 85 deletions

View File

@ -16,7 +16,7 @@
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] connect-standalone.properties"
echo "USAGE: $0 [-daemon] connect-standalone.properties [connector1.properties connector2.json ...]"
exit 1
fi

View File

@ -534,6 +534,7 @@
</subpackage>
<subpackage name="cli">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.connect.runtime" />
<allow pkg="org.apache.kafka.connect.storage" />
<allow pkg="org.apache.kafka.connect.util" />

View File

@ -16,20 +16,26 @@
*/
package org.apache.kafka.connect.cli;
import com.fasterxml.jackson.core.exc.StreamReadException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DatabindException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
@ -38,9 +44,14 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Map;
import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
/**
* <p>
* Command line utility that runs Kafka Connect as a standalone process. In this mode, work (connectors and tasks) is not
@ -61,23 +72,24 @@ public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
@Override
protected String usage() {
return "ConnectStandalone worker.properties [connector1.properties connector2.properties ...]";
return "ConnectStandalone worker.properties [connector1.properties connector2.json ...]";
}
@Override
protected void processExtraArgs(Herder herder, Connect connect, String[] extraArgs) {
try {
for (final String connectorPropsFile : extraArgs) {
Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
for (final String connectorConfigFile : extraArgs) {
CreateConnectorRequest createConnectorRequest = parseConnectorConfigurationFile(connectorConfigFile);
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>((error, info) -> {
if (error != null)
log.error("Failed to create connector for {}", connectorPropsFile);
log.error("Failed to create connector for {}", connectorConfigFile);
else
log.info("Created connector {}", info.result().name());
});
herder.putConnectorConfig(
connectorProps.get(ConnectorConfig.NAME_CONFIG),
connectorProps, false, cb);
createConnectorRequest.name(), createConnectorRequest.config(),
createConnectorRequest.initialTargetState(),
false, cb);
cb.get();
}
} catch (Throwable t) {
@ -87,6 +99,64 @@ public class ConnectStandalone extends AbstractConnectCli<StandaloneConfig> {
}
}
/**
* Parse a connector configuration file into a {@link CreateConnectorRequest}. The file can have any one of the following formats (note that
* we attempt to parse the file in this order):
* <ol>
* <li>A JSON file containing an Object with only String keys and values that represent the connector configuration.</li>
* <li>A JSON file containing an Object that can be parsed directly into a {@link CreateConnectorRequest}</li>
* <li>A valid Java Properties file (i.e. containing String key/value pairs representing the connector configuration)</li>
* </ol>
* <p>
* Visible for testing.
*
* @param filePath the path of the connector configuration file
* @return the parsed connector configuration in the form of a {@link CreateConnectorRequest}
*/
CreateConnectorRequest parseConnectorConfigurationFile(String filePath) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
File connectorConfigurationFile = Paths.get(filePath).toFile();
try {
Map<String, String> connectorConfigs = objectMapper.readValue(
connectorConfigurationFile,
new TypeReference<Map<String, String>>() { });
if (!connectorConfigs.containsKey(NAME_CONFIG)) {
throw new ConnectException("Connector configuration at '" + filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+ "configuration");
}
return new CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs, null);
} catch (StreamReadException | DatabindException e) {
log.debug("Could not parse connector configuration file '{}' into a Map with String keys and values", filePath);
}
try {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
CreateConnectorRequest createConnectorRequest = objectMapper.readValue(connectorConfigurationFile,
new TypeReference<CreateConnectorRequest>() { });
if (createConnectorRequest.config().containsKey(NAME_CONFIG)) {
if (!createConnectorRequest.config().get(NAME_CONFIG).equals(createConnectorRequest.name())) {
throw new ConnectException("Connector name configuration in 'config' doesn't match the one specified in 'name' at '" + filePath
+ "'");
}
} else {
createConnectorRequest.config().put(NAME_CONFIG, createConnectorRequest.name());
}
return createConnectorRequest;
} catch (StreamReadException | DatabindException e) {
log.debug("Could not parse connector configuration file '{}' into an object of type {}",
filePath, CreateConnectorRequest.class.getSimpleName());
}
Map<String, String> connectorConfigs = Utils.propsToStringMap(Utils.loadProps(filePath));
if (!connectorConfigs.containsKey(NAME_CONFIG)) {
throw new ConnectException("Connector configuration at '" + filePath + "' is missing the mandatory '" + NAME_CONFIG + "' "
+ "configuration");
}
return new CreateConnectorRequest(connectorConfigs.get(NAME_CONFIG), connectorConfigs, null);
}
@Override
protected Herder createHerder(StandaloneConfig config, String workerId, Plugins plugins,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,

View File

@ -108,6 +108,19 @@ public interface Herder {
*/
void putConnectorConfig(String connName, Map<String, String> config, boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
/**
* Set the configuration for a connector, along with a target state optionally. This supports creation and updating.
* @param connName name of the connector
* @param config the connector's configuration
* @param targetState the desired target state for the connector; may be {@code null} if no target state change is desired. Note that the default
* target state is {@link TargetState#STARTED} if no target state exists previously
* @param allowReplace if true, allow overwriting previous configs; if false, throw {@link AlreadyExistsException}
* if a connector with the same name already exists
* @param callback callback to invoke when the configuration has been written
*/
void putConnectorConfig(String connName, Map<String, String> config, TargetState targetState, boolean allowReplace,
Callback<Created<ConnectorInfo>> callback);
/**
* Delete a connector and its configuration.
* @param connName name of the connector

View File

@ -1051,6 +1051,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
@Override
public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace,
final Callback<Created<ConnectorInfo>> callback) {
putConnectorConfig(connName, config, null, allowReplace, callback);
}
@Override
public void putConnectorConfig(final String connName, final Map<String, String> config, final TargetState targetState,
final boolean allowReplace, final Callback<Created<ConnectorInfo>> callback) {
log.trace("Submitting connector config write request {}", connName);
addRequest(
() -> {
@ -1081,7 +1087,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors());
writeToConfigTopicAsLeader(() -> configBackingStore.putConnectorConfig(connName, config));
writeToConfigTopicAsLeader(() -> configBackingStore.putConnectorConfig(connName, config, targetState));
// Note that we use the updated connector config despite the fact that we don't have an updated
// snapshot yet. The existing task info should still be accurate.

View File

@ -18,18 +18,23 @@ package org.apache.kafka.connect.runtime.rest.entities;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.connect.runtime.TargetState;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
public class CreateConnectorRequest {
private final String name;
private final Map<String, String> config;
private final InitialState initialState;
@JsonCreator
public CreateConnectorRequest(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config) {
public CreateConnectorRequest(@JsonProperty("name") String name, @JsonProperty("config") Map<String, String> config,
@JsonProperty("initial_state") InitialState initialState) {
this.name = name;
this.config = config;
this.initialState = initialState;
}
@JsonProperty
@ -42,17 +47,55 @@ public class CreateConnectorRequest {
return config;
}
@JsonProperty
public InitialState initialState() {
return initialState;
}
public TargetState initialTargetState() {
if (initialState != null) {
return initialState.toTargetState();
} else {
return null;
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CreateConnectorRequest that = (CreateConnectorRequest) o;
return Objects.equals(name, that.name) &&
Objects.equals(config, that.config);
Objects.equals(config, that.config) &&
Objects.equals(initialState, that.initialState);
}
@Override
public int hashCode() {
return Objects.hash(name, config);
return Objects.hash(name, config, initialState);
}
public enum InitialState {
RUNNING,
PAUSED,
STOPPED;
@JsonCreator
public static InitialState forValue(String value) {
return InitialState.valueOf(value.toUpperCase(Locale.ROOT));
}
public TargetState toTargetState() {
switch (this) {
case RUNNING:
return TargetState.STARTED;
case PAUSED:
return TargetState.PAUSED;
case STOPPED:
return TargetState.STOPPED;
default:
throw new IllegalArgumentException("Unknown initial state: " + this);
}
}
}
}

View File

@ -145,7 +145,7 @@ public class ConnectorsResource implements ConnectResource {
checkAndPutConnectorConfigName(name, configs);
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.putConnectorConfig(name, configs, false, cb);
herder.putConnectorConfig(name, configs, createRequest.initialTargetState(), false, cb);
Herder.Created<ConnectorInfo> info = requestHandler.completeOrForwardRequest(cb, "/connectors", "POST", headers, createRequest,
new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);

View File

@ -183,6 +183,12 @@ public class StandaloneHerder extends AbstractHerder {
final Map<String, String> config,
boolean allowReplace,
final Callback<Created<ConnectorInfo>> callback) {
putConnectorConfig(connName, config, null, allowReplace, callback);
}
@Override
public void putConnectorConfig(final String connName, final Map<String, String> config, final TargetState targetState,
final boolean allowReplace, final Callback<Created<ConnectorInfo>> callback) {
try {
validateConnectorConfig(config, (error, configInfos) -> {
if (error != null) {
@ -191,7 +197,7 @@ public class StandaloneHerder extends AbstractHerder {
}
requestExecutorService.submit(
() -> putConnectorConfig(connName, config, allowReplace, callback, configInfos)
() -> putConnectorConfig(connName, config, targetState, allowReplace, callback, configInfos)
);
});
} catch (Throwable t) {
@ -201,6 +207,7 @@ public class StandaloneHerder extends AbstractHerder {
private synchronized void putConnectorConfig(String connName,
final Map<String, String> config,
TargetState targetState,
boolean allowReplace,
final Callback<Created<ConnectorInfo>> callback,
ConfigInfos configInfos) {
@ -221,7 +228,7 @@ public class StandaloneHerder extends AbstractHerder {
created = true;
}
configBackingStore.putConnectorConfig(connName, config);
configBackingStore.putConnectorConfig(connName, config, targetState);
startConnector(connName, (error, result) -> {
if (error != null) {

View File

@ -56,8 +56,10 @@ public interface ConfigBackingStore {
* Update the configuration for a connector.
* @param connector name of the connector
* @param properties the connector configuration
* @param targetState the desired target state for the connector; may be {@code null} if no target state change is desired. Note that the default
* target state is {@link TargetState#STARTED} if no target state exists previously
*/
void putConnectorConfig(String connector, Map<String, String> properties);
void putConnectorConfig(String connector, Map<String, String> properties, TargetState targetState);
/**
* Remove configuration for a connector

View File

@ -497,26 +497,34 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
}
/**
* Write this connector configuration to persistent storage and wait until it has been acknowledged and read back by
* tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} must be successfully invoked before calling
* Write this connector configuration (and optionally a target state) to persistent storage and wait until it has been acknowledged and read
* back by tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} must be successfully invoked before calling
* this method if the worker is configured to use a fencable producer for writes to the config topic.
*
* @param connector name of the connector to write data for
* @param properties the configuration to write
* @param targetState the desired target state for the connector; may be {@code null} if no target state change is desired. Note that the default
* target state is {@link TargetState#STARTED} if no target state exists previously
* @throws IllegalStateException if {@link #claimWritePrivileges()} is required, but was not successfully invoked before
* this method was called
* @throws PrivilegedWriteException if the worker is configured to use a fencable producer for writes to the config topic
* and the write fails
*/
@Override
public void putConnectorConfig(String connector, Map<String, String> properties) {
public void putConnectorConfig(String connector, Map<String, String> properties, TargetState targetState) {
log.debug("Writing connector configuration for connector '{}'", connector);
Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
connectConfig.put("properties", properties);
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
try {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
sendPrivileged(CONNECTOR_KEY(connector), serializedConfig, timer);
List<ProducerKeyValue> keyValues = new ArrayList<>();
if (targetState != null) {
log.debug("Writing target state {} for connector {}", targetState, connector);
keyValues.add(new ProducerKeyValue(TARGET_STATE_KEY(connector), serializeTargetState(targetState)));
}
keyValues.add(new ProducerKeyValue(CONNECTOR_KEY(connector), serializedConfig));
sendPrivileged(keyValues, timer);
configLog.readToEnd().get(timer.remainingMs(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write connector configuration to Kafka: ", e);
@ -647,20 +655,24 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
*/
@Override
public void putTargetState(String connector, TargetState state) {
Struct connectTargetState = new Struct(TARGET_STATE_V1);
// Older workers don't support the STOPPED state; fall back on PAUSED
connectTargetState.put("state", state == STOPPED ? PAUSED.name() : state.name());
connectTargetState.put("state.v2", state.name());
byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V1, connectTargetState);
log.debug("Writing target state {} for connector {}", state, connector);
try {
configLog.sendWithReceipt(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
configLog.sendWithReceipt(TARGET_STATE_KEY(connector), serializeTargetState(state))
.get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("Failed to write target state to Kafka", e);
throw new ConnectException("Error writing target state to Kafka", e);
}
}
private byte[] serializeTargetState(TargetState state) {
Struct connectTargetState = new Struct(TARGET_STATE_V1);
// Older workers don't support the STOPPED state; fall back on PAUSED
connectTargetState.put("state", state == STOPPED ? PAUSED.name() : state.name());
connectTargetState.put("state.v2", state.name());
return converter.fromConnectData(topic, TARGET_STATE_V1, connectTargetState);
}
/**
* Write a task count record for a connector to persistent storage and wait until it has been acknowledged and read back by
* tailing the Kafka log with a consumer. {@link #claimWritePrivileges()} must be successfully invoked before calling this method
@ -985,7 +997,9 @@ public class KafkaConfigBackingStore extends KafkaTopicBasedBackingStore impleme
// Note that we do not notify the update listener if the target state has been removed.
// Instead we depend on the removal callback of the connector config itself to notify the worker.
if (started && !removed && stateChanged)
// We also don't notify the update listener if the connector doesn't exist yet - a scenario that can
// occur if an explicit initial target state is specified in the connector creation request.
if (started && !removed && stateChanged && connectorConfigs.containsKey(connectorName))
updateListener.onConnectorTargetStateChange(connectorName);
}

View File

@ -92,12 +92,16 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
}
@Override
public synchronized void putConnectorConfig(String connector, Map<String, String> properties) {
public synchronized void putConnectorConfig(String connector, Map<String, String> properties, TargetState targetState) {
ConnectorState state = connectors.get(connector);
if (state == null)
connectors.put(connector, new ConnectorState(properties));
else
connectors.put(connector, new ConnectorState(properties, targetState));
else {
state.connConfig = properties;
if (targetState != null) {
state.targetState = targetState;
}
}
if (updateListener != null)
updateListener.onConnectorConfigUpdate(connector);
@ -184,8 +188,13 @@ public class MemoryConfigBackingStore implements ConfigBackingStore {
private Map<String, String> connConfig;
private Map<ConnectorTaskId, Map<String, String>> taskConfigs;
public ConnectorState(Map<String, String> connConfig) {
this.targetState = TargetState.STARTED;
/**
* @param connConfig the connector's configuration
* @param targetState the connector's initial {@link TargetState}; may be {@code null} in which case the default initial target state
* {@link TargetState#STARTED} will be used
*/
public ConnectorState(Map<String, String> connConfig, TargetState targetState) {
this.targetState = targetState == null ? TargetState.STARTED : targetState;
this.connConfig = connConfig;
this.taskConfigs = new HashMap<>();
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.cli;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class ConnectStandaloneTest {
private static final String CONNECTOR_NAME = "test-connector";
private static final Map<String, String> CONNECTOR_CONFIG = new HashMap<>();
static {
CONNECTOR_CONFIG.put(NAME_CONFIG, CONNECTOR_NAME);
CONNECTOR_CONFIG.put("key1", "val1");
CONNECTOR_CONFIG.put("key2", "val2");
}
private final ConnectStandalone connectStandalone = new ConnectStandalone();
private File connectorConfigurationFile;
@Before
public void setUp() throws IOException {
connectorConfigurationFile = TestUtils.tempFile();
}
@Test
public void testParseJavaPropertiesFile() throws Exception {
Properties properties = new Properties();
CONNECTOR_CONFIG.forEach(properties::setProperty);
try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
properties.store(writer, null);
}
CreateConnectorRequest request = connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
assertEquals(CONNECTOR_NAME, request.name());
assertEquals(CONNECTOR_CONFIG, request.config());
assertNull(request.initialState());
}
@Test
public void testParseJsonFileWithConnectorConfiguration() throws Exception {
try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
writer.write(new ObjectMapper().writeValueAsString(CONNECTOR_CONFIG));
}
CreateConnectorRequest request = connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
assertEquals(CONNECTOR_NAME, request.name());
assertEquals(CONNECTOR_CONFIG, request.config());
assertNull(request.initialState());
}
@Test
public void testParseJsonFileWithCreateConnectorRequest() throws Exception {
CreateConnectorRequest requestToWrite = new CreateConnectorRequest(
CONNECTOR_NAME,
CONNECTOR_CONFIG,
CreateConnectorRequest.InitialState.STOPPED
);
try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
writer.write(new ObjectMapper().writeValueAsString(requestToWrite));
}
CreateConnectorRequest parsedRequest = connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
assertEquals(requestToWrite, parsedRequest);
}
@Test
public void testParseJsonFileWithCreateConnectorRequestWithoutInitialState() throws Exception {
Map<String, Object> requestToWrite = new HashMap<>();
requestToWrite.put("name", CONNECTOR_NAME);
requestToWrite.put("config", CONNECTOR_CONFIG);
try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
writer.write(new ObjectMapper().writeValueAsString(requestToWrite));
}
CreateConnectorRequest parsedRequest = connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
CreateConnectorRequest expectedRequest = new CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
assertEquals(expectedRequest, parsedRequest);
}
@Test
public void testParseJsonFileWithCreateConnectorRequestWithUnknownField() throws Exception {
Map<String, Object> requestToWrite = new HashMap<>();
requestToWrite.put("name", CONNECTOR_NAME);
requestToWrite.put("config", CONNECTOR_CONFIG);
requestToWrite.put("unknown-field", "random-value");
try (FileWriter writer = new FileWriter(connectorConfigurationFile)) {
writer.write(new ObjectMapper().writeValueAsString(requestToWrite));
}
CreateConnectorRequest parsedRequest = connectStandalone.parseConnectorConfigurationFile(connectorConfigurationFile.getAbsolutePath());
CreateConnectorRequest expectedRequest = new CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
assertEquals(expectedRequest, parsedRequest);
}
}

View File

@ -16,8 +16,11 @@
*/
package org.apache.kafka.connect.integration;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
@ -49,14 +52,15 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -73,7 +77,7 @@ public class ConnectWorkerIntegrationTest {
private static final int NUM_WORKERS = 3;
private static final int NUM_TASKS = 4;
private static final int MESSAGES_PER_POLL = 10;
private static final String CONNECTOR_NAME = "simple-source";
private static final String CONNECTOR_NAME = "simple-connector";
private static final String TOPIC_NAME = "test-topic";
private EmbeddedConnectCluster.Builder connectBuilder;
@ -529,8 +533,10 @@ public class ConnectWorkerIntegrationTest {
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
connect.assertions().assertAtLeastNumWorkersAreUp(
NUM_WORKERS,
"Initial group of workers did not start in time."
);
connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME));
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
@ -546,10 +552,227 @@ public class ConnectWorkerIntegrationTest {
assertEquals(Level.WARN.toString(), logEvents.get(0).getLevel());
assertThat(logEvents.get(0).getMessage(), containsString("deprecated"));
}
}
@Test
public void testCreateConnectorWithPausedInitialState() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest(
CONNECTOR_NAME,
defaultSourceConnectorProps(TOPIC_NAME),
CreateConnectorRequest.InitialState.PAUSED
);
connect.configureConnector(createConnectorRequest);
// Verify that the connector's status is PAUSED and also that no tasks were spawned for the connector
connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
CONNECTOR_NAME,
0,
"Connector was not created in a paused state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
// Verify that a connector created in the PAUSED state can be resumed successfully
connect.resumeConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector or tasks did not start running healthily in time"
);
}
@Test
public void testCreateSourceConnectorWithStoppedInitialStateAndModifyOffsets() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
Map<String, String> props = defaultSourceConnectorProps(TOPIC_NAME);
// Configure the connector to produce a maximum of 10 messages
props.put("max.messages", "10");
props.put(TASKS_MAX_CONFIG, "1");
CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest(
CONNECTOR_NAME,
props,
CreateConnectorRequest.InitialState.STOPPED
);
connect.configureConnector(createConnectorRequest);
// Verify that the connector's status is STOPPED and also that no tasks were spawned for the connector
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector was not created in a stopped state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
// Verify that the offsets can be modified for a source connector created in the STOPPED state
// Alter the offsets so that only 5 messages are produced
connect.alterSourceConnectorOffset(
CONNECTOR_NAME,
Collections.singletonMap("task.id", CONNECTOR_NAME + "-0"),
Collections.singletonMap("saved", 5L)
);
// Verify that a connector created in the STOPPED state can be resumed successfully
connect.resumeConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
1,
"Connector or tasks did not start running healthily in time"
);
// Verify that only 5 messages were produced. We verify this by consuming all the messages from the topic after we've already ensured that at
// least 5 messages can be consumed.
long timeoutMs = TimeUnit.SECONDS.toMillis(10);
connect.kafka().consume(5, timeoutMs, TOPIC_NAME);
assertEquals(5, connect.kafka().consumeAll(timeoutMs, TOPIC_NAME).count());
}
@Test
public void testCreateSinkConnectorWithStoppedInitialStateAndModifyOffsets() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
// Create topic and produce 10 messages
connect.kafka().createTopic(TOPIC_NAME);
for (int i = 0; i < 10; i++) {
connect.kafka().produce(TOPIC_NAME, "Message " + i);
}
Map<String, String> props = defaultSinkConnectorProps(TOPIC_NAME);
props.put(TASKS_MAX_CONFIG, "1");
CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest(
CONNECTOR_NAME,
props,
CreateConnectorRequest.InitialState.STOPPED
);
connect.configureConnector(createConnectorRequest);
// Verify that the connector's status is STOPPED and also that no tasks were spawned for the connector
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector was not created in a stopped state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
// Verify that the offsets can be modified for a sink connector created in the STOPPED state
// Alter the offsets so that the first 5 messages in the topic are skipped
connect.alterSinkConnectorOffset(CONNECTOR_NAME, new TopicPartition(TOPIC_NAME, 0), 5L);
// This will cause the connector task to fail if it encounters a record with offset < 5
TaskHandle taskHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME).taskHandle(CONNECTOR_NAME + "-0",
sinkRecord -> {
if (sinkRecord.kafkaOffset() < 5L) {
throw new ConnectException("Unexpected record encountered: " + sinkRecord);
}
});
// We produced 10 records and altered the connector offsets to skip over the first 5, so we expect 5 records to be consumed
taskHandle.expectedRecords(5);
// Verify that a connector created in the STOPPED state can be resumed successfully
connect.resumeConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
1,
"Connector or tasks did not start running healthily in time"
);
taskHandle.awaitRecords(TimeUnit.SECONDS.toMillis(10));
// Confirm that the task is still running (i.e. it didn't fail due to encountering any records with offset < 5)
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
1,
"Connector or tasks did not start running healthily in time"
);
}
@Test
public void testDeleteConnectorCreatedWithPausedOrStoppedInitialState() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
// Create a connector with PAUSED initial state
CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest(
CONNECTOR_NAME,
defaultSourceConnectorProps(TOPIC_NAME),
CreateConnectorRequest.InitialState.PAUSED
);
connect.configureConnector(createConnectorRequest);
// Verify that the connector's status is PAUSED and also that no tasks were spawned for the connector
connect.assertions().assertConnectorAndExactlyNumTasksArePaused(
CONNECTOR_NAME,
0,
"Connector was not created in a paused state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
// Verify that a connector created in the PAUSED state can be deleted successfully
connect.deleteConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time");
// Create a connector with STOPPED initial state
createConnectorRequest = new CreateConnectorRequest(
CONNECTOR_NAME,
defaultSourceConnectorProps(TOPIC_NAME),
CreateConnectorRequest.InitialState.STOPPED
);
connect.configureConnector(createConnectorRequest);
// Verify that the connector's status is STOPPED and also that no tasks were spawned for the connector
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector was not created in a stopped state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
// Verify that a connector created in the STOPPED state can be deleted successfully
connect.deleteConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time");
}
private Map<String, String> defaultSinkConnectorProps(String topics) {
// setup props for the sink connector
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPICS_CONFIG, topics);
return props;
}
private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup up props for the source connector
// setup props for the source connector
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));

View File

@ -70,6 +70,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@ -187,7 +188,7 @@ public class RestForwardingIntegrationTest {
followerCallbackCaptor.getValue().onCompletion(forwardException, null);
return null;
}).when(followerHerder)
.putConnectorConfig(any(), any(), anyBoolean(), followerCallbackCaptor.capture());
.putConnectorConfig(any(), any(), isNull(), anyBoolean(), followerCallbackCaptor.capture());
// Leader will reply
ConnectorInfo connectorInfo = new ConnectorInfo("blah", Collections.emptyMap(), Collections.emptyList(), ConnectorType.SOURCE);
@ -197,7 +198,7 @@ public class RestForwardingIntegrationTest {
leaderCallbackCaptor.getValue().onCompletion(null, leaderAnswer);
return null;
}).when(leaderHerder)
.putConnectorConfig(any(), any(), anyBoolean(), leaderCallbackCaptor.capture());
.putConnectorConfig(any(), any(), isNull(), anyBoolean(), leaderCallbackCaptor.capture());
// Client makes request to the follower
URI followerUrl = followerServer.advertisedUrl();

View File

@ -17,7 +17,9 @@
package org.apache.kafka.connect.integration;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectStandalone;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
@ -26,12 +28,21 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC_CREATION_PREFIX;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -41,6 +52,10 @@ import static org.junit.Assert.assertTrue;
@Category(IntegrationTest.class)
public class StandaloneWorkerIntegrationTest {
private static final String CONNECTOR_NAME = "test-connector";
private static final int NUM_TASKS = 4;
private static final String TOPIC_NAME = "test-topic";
private EmbeddedConnectStandalone connect;
@Before
@ -202,4 +217,42 @@ public class StandaloneWorkerIntegrationTest {
return entry.getValue().level();
}
@Test
public void testCreateConnectorWithStoppedInitialState() throws Exception {
CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest(
CONNECTOR_NAME,
defaultSourceConnectorProps(TOPIC_NAME),
CreateConnectorRequest.InitialState.STOPPED
);
connect.configureConnector(createConnectorRequest);
// Verify that the connector's status is STOPPED and also that no tasks were spawned for the connector
connect.assertions().assertConnectorIsStopped(
CONNECTOR_NAME,
"Connector was not created in a stopped state"
);
assertEquals(Collections.emptyList(), connect.connectorInfo(CONNECTOR_NAME).tasks());
assertEquals(Collections.emptyList(), connect.taskConfigs(CONNECTOR_NAME));
// Verify that a connector created in the STOPPED state can be resumed successfully
connect.resumeConnector(CONNECTOR_NAME);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector or tasks did not start running healthily in time"
);
}
private Map<String, String> defaultSourceConnectorProps(String topic) {
// setup props for the source connector
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPIC_CONFIG, topic);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(DEFAULT_TOPIC_CREATION_PREFIX + REPLICATION_FACTOR_CONFIG, String.valueOf(1));
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
return props;
}
}

View File

@ -690,7 +690,7 @@ public class DistributedHerderTest {
}).when(herder).validateConnectorConfig(eq(CONN2_CONFIG), validateCallback.capture());
// CONN2 is new, should succeed
doNothing().when(configBackingStore).putConnectorConfig(CONN2, CONN2_CONFIG);
doNothing().when(configBackingStore).putConnectorConfig(eq(CONN2), eq(CONN2_CONFIG), isNull());
// This will occur just before/during the second tick
doNothing().when(member).ensureActive();
@ -713,6 +713,51 @@ public class DistributedHerderTest {
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore, putConnectorCallback);
}
@Test
public void testCreateConnectorWithInitialState() throws Exception {
when(member.memberId()).thenReturn("leader");
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
expectConfigRefreshAndSnapshot(SNAPSHOT);
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
doNothing().when(member).poll(anyLong());
// Initial rebalance where this member becomes the leader
herder.tick();
// mock the actual validation since its asynchronous nature is difficult to test and should
// be covered sufficiently by the unit tests for the AbstractHerder class
ArgumentCaptor<Callback<ConfigInfos>> validateCallback = ArgumentCaptor.forClass(Callback.class);
doAnswer(invocation -> {
validateCallback.getValue().onCompletion(null, CONN2_CONFIG_INFOS);
return null;
}).when(herder).validateConnectorConfig(eq(CONN2_CONFIG), validateCallback.capture());
// CONN2 is new, should succeed
doNothing().when(configBackingStore).putConnectorConfig(eq(CONN2), eq(CONN2_CONFIG), eq(TargetState.STOPPED));
// This will occur just before/during the second tick
doNothing().when(member).ensureActive();
// No immediate action besides this -- change will be picked up via the config log
herder.putConnectorConfig(CONN2, CONN2_CONFIG, TargetState.STOPPED, false, putConnectorCallback);
// This tick runs the initial herder request, which issues an asynchronous request for
// connector validation
herder.tick();
// Once that validation is complete, another request is added to the herder request queue
// for actually performing the config write; this tick is for that request
herder.tick();
time.sleep(1000L);
assertStatistics(3, 1, 100, 1000L);
ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.emptyList(), ConnectorType.SOURCE);
verify(putConnectorCallback).onCompletion(isNull(), eq(new Herder.Created<>(true, info)));
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore, putConnectorCallback);
}
@Test
public void testCreateConnectorConfigBackingStoreError() {
when(member.memberId()).thenReturn("leader");
@ -735,7 +780,7 @@ public class DistributedHerderTest {
}).when(herder).validateConnectorConfig(eq(CONN2_CONFIG), validateCallback.capture());
doThrow(new ConnectException("Error writing connector configuration to Kafka"))
.when(configBackingStore).putConnectorConfig(CONN2, CONN2_CONFIG);
.when(configBackingStore).putConnectorConfig(eq(CONN2), eq(CONN2_CONFIG), isNull());
// This will occur just before/during the second tick
doNothing().when(member).ensureActive();
@ -2184,7 +2229,7 @@ public class DistributedHerderTest {
// Simulate response to writing config + waiting until end of log to be read
configUpdateListener.onConnectorConfigUpdate(CONN1);
return null;
}).when(configBackingStore).putConnectorConfig(eq(CONN1), eq(CONN1_CONFIG_UPDATED));
}).when(configBackingStore).putConnectorConfig(eq(CONN1), eq(CONN1_CONFIG_UPDATED), isNull());
// As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart
// connector without rebalance

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime.rest.entities;
import org.apache.kafka.connect.runtime.TargetState;
import org.junit.Test;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class CreateConnectorRequestTest {
@Test
public void testToTargetState() {
assertEquals(TargetState.STARTED, CreateConnectorRequest.InitialState.RUNNING.toTargetState());
assertEquals(TargetState.PAUSED, CreateConnectorRequest.InitialState.PAUSED.toTargetState());
assertEquals(TargetState.STOPPED, CreateConnectorRequest.InitialState.STOPPED.toTargetState());
CreateConnectorRequest createConnectorRequest = new CreateConnectorRequest("test-name", Collections.emptyMap(), null);
assertNull(createConnectorRequest.initialTargetState());
}
@Test
public void testForValue() {
assertEquals(CreateConnectorRequest.InitialState.RUNNING, CreateConnectorRequest.InitialState.forValue("running"));
assertEquals(CreateConnectorRequest.InitialState.RUNNING, CreateConnectorRequest.InitialState.forValue("Running"));
assertEquals(CreateConnectorRequest.InitialState.RUNNING, CreateConnectorRequest.InitialState.forValue("RUNNING"));
assertEquals(CreateConnectorRequest.InitialState.PAUSED, CreateConnectorRequest.InitialState.forValue("paused"));
assertEquals(CreateConnectorRequest.InitialState.PAUSED, CreateConnectorRequest.InitialState.forValue("Paused"));
assertEquals(CreateConnectorRequest.InitialState.PAUSED, CreateConnectorRequest.InitialState.forValue("PAUSED"));
assertEquals(CreateConnectorRequest.InitialState.STOPPED, CreateConnectorRequest.InitialState.forValue("stopped"));
assertEquals(CreateConnectorRequest.InitialState.STOPPED, CreateConnectorRequest.InitialState.forValue("Stopped"));
assertEquals(CreateConnectorRequest.InitialState.STOPPED, CreateConnectorRequest.InitialState.forValue("STOPPED"));
}
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
@ -272,23 +273,64 @@ public class ConnectorsResourceTest {
@Test
public void testCreateConnector() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture());
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), isNull(), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
}
@Test
public void testCreateConnectorWithPausedInitialState() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.PAUSED);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.PAUSED), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
}
@Test
public void testCreateConnectorWithStoppedInitialState() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.STOPPED);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.STOPPED), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
}
@Test
public void testCreateConnectorWithRunningInitialState() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), CreateConnectorRequest.InitialState.RUNNING);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(TargetState.STARTED), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, body);
}
@Test
public void testCreateConnectorNotLeader() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackNotLeaderException(cb).when(herder)
.putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture());
.putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), isNull(), eq(false), cb.capture());
when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), isNull(), eq(body), any()))
.thenReturn(new RestClient.HttpResponse<>(201, new HashMap<>(), new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG, CONNECTOR_TASK_NAMES, ConnectorType.SOURCE)));
@ -297,11 +339,12 @@ public class ConnectorsResourceTest {
@Test
public void testCreateConnectorWithHeaders() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
HttpHeaders httpHeaders = mock(HttpHeaders.class);
expectAndCallbackNotLeaderException(cb)
.when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture());
.when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), isNull(), eq(false), cb.capture());
when(restClient.httpRequest(eq(LEADER_URL + "connectors?forward=false"), eq("POST"), eq(httpHeaders), any(), any()))
.thenReturn(new RestClient.HttpResponse<>(202, new HashMap<>(), null));
@ -310,11 +353,12 @@ public class ConnectorsResourceTest {
@Test
public void testCreateConnectorExists() {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME));
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackException(cb, new AlreadyExistsException("already exists"))
.when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), eq(false), cb.capture());
.when(herder).putConnectorConfig(eq(CONNECTOR_NAME), eq(body.config()), isNull(), eq(false), cb.capture());
assertThrows(AlreadyExistsException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, body));
}
@ -323,13 +367,13 @@ public class ConnectorsResourceTest {
// Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the name in it) and this
// will affect later tests
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_PADDING_WHITESPACES, inputConfig, null);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest(CONNECTOR_NAME, CONNECTOR_CONFIG, null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), eq(false), cb.capture());
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
}
@ -339,13 +383,13 @@ public class ConnectorsResourceTest {
// Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the name in it) and this
// will affect later tests
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(CONNECTOR_NAME_ALL_WHITESPACES, inputConfig, null);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME, null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), eq(false), cb.capture());
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
}
@ -355,13 +399,13 @@ public class ConnectorsResourceTest {
// Clone CONNECTOR_CONFIG_WITHOUT_NAME Map, as createConnector changes it (puts the name in it) and this
// will affect later tests
Map<String, String> inputConfig = getConnectorConfig(CONNECTOR_CONFIG_WITHOUT_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null, inputConfig);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME);
final CreateConnectorRequest bodyIn = new CreateConnectorRequest(null, inputConfig, null);
final CreateConnectorRequest bodyOut = new CreateConnectorRequest("", CONNECTOR_CONFIG_WITH_EMPTY_NAME, null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(bodyOut.name(), bodyOut.config(),
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), eq(false), cb.capture());
).when(herder).putConnectorConfig(eq(bodyOut.name()), eq(bodyOut.config()), isNull(), eq(false), cb.capture());
connectorsResource.createConnector(FORWARD, NULL_HEADERS, bodyIn);
}
@ -476,12 +520,13 @@ public class ConnectorsResourceTest {
@Test
public void testCreateConnectorWithSpecialCharsInName() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME_SPECIAL_CHARS, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME_SPECIAL_CHARS));
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME_SPECIAL_CHARS,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME_SPECIAL_CHARS), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_SPECIAL_CHARS, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_SPECIAL_CHARS), eq(body.config()), eq(false), cb.capture());
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_SPECIAL_CHARS), eq(body.config()), isNull(), eq(false), cb.capture());
String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
@ -490,12 +535,13 @@ public class ConnectorsResourceTest {
@Test
public void testCreateConnectorWithControlSequenceInName() throws Throwable {
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1, Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME_CONTROL_SEQUENCES1));
CreateConnectorRequest body = new CreateConnectorRequest(CONNECTOR_NAME_CONTROL_SEQUENCES1,
Collections.singletonMap(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME_CONTROL_SEQUENCES1), null);
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME_CONTROL_SEQUENCES1, CONNECTOR_CONFIG,
CONNECTOR_TASK_NAMES, ConnectorType.SOURCE))
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), eq(body.config()), eq(false), cb.capture());
).when(herder).putConnectorConfig(eq(CONNECTOR_NAME_CONTROL_SEQUENCES1), eq(body.config()), isNull(), eq(false), cb.capture());
String rspLocation = connectorsResource.createConnector(FORWARD, NULL_HEADERS, body).getLocation().toString();
String decoded = new URI(rspLocation).getPath();
@ -540,7 +586,7 @@ public class ConnectorsResourceTest {
public void testCreateConnectorConfigNameMismatch() {
Map<String, String> connConfig = new HashMap<>();
connConfig.put(ConnectorConfig.NAME_CONFIG, "mismatched-name");
CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig);
CreateConnectorRequest request = new CreateConnectorRequest(CONNECTOR_NAME, connConfig, null);
assertThrows(BadRequestException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, request));
}

View File

@ -247,6 +247,37 @@ public class StandaloneHerderTest {
PowerMock.verifyAll();
}
@Test
public void testCreateConnectorWithStoppedInitialState() throws Exception {
connector = PowerMock.createMock(BogusSinkConnector.class);
Map<String, String> config = connectorConfig(SourceSink.SINK);
Connector connectorMock = PowerMock.createMock(SinkConnector.class);
expectConfigValidation(connectorMock, false, config);
EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(connectorMock);
// Only the connector should be created; we expect no tasks to be spawned for a connector created with a paused or stopped initial state
Capture<Callback<TargetState>> onStart = EasyMock.newCapture();
worker.startConnector(eq(CONNECTOR_NAME), eq(config), EasyMock.anyObject(HerderConnectorContext.class),
eq(herder), eq(TargetState.STOPPED), EasyMock.capture(onStart));
EasyMock.expectLastCall().andAnswer(() -> {
onStart.getValue().onCompletion(null, TargetState.STOPPED);
return true;
});
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true).anyTimes();
EasyMock.expect(herder.connectorType(anyObject())).andReturn(ConnectorType.SINK);
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, config, TargetState.STOPPED, false, createCallback);
Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(
new ConnectorInfo(CONNECTOR_NAME, connectorConfig(SourceSink.SINK), Collections.emptyList(), ConnectorType.SINK),
connectorInfo.result()
);
PowerMock.verifyAll();
}
@Test
public void testDestroyConnector() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);

View File

@ -24,11 +24,11 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Field;
@ -77,12 +77,12 @@ import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.INCLUDE_TASKS_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.ONLY_FAILED_FIELD_NAME;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.READ_WRITE_TOTAL_TIMEOUT_MS;
import static org.apache.kafka.connect.storage.KafkaConfigBackingStore.RESTART_KEY;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotSame;
@ -177,6 +177,10 @@ public class KafkaConfigBackingStoreTest {
"config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()
);
private static final List<byte[]> TARGET_STATES_SERIALIZED = Arrays.asList(
"started".getBytes(), "paused".getBytes(), "stopped".getBytes()
);
@Mock
private Converter converter;
@Mock
@ -320,14 +324,14 @@ public class KafkaConfigBackingStoreTest {
assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
// Writing should block until it is written and read back from Kafka
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), null);
configState = configStorage.snapshot();
assertEquals(1, configState.offset());
assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
// Second should also block and all configs should still be available
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1), null);
configState = configStorage.snapshot();
assertEquals(2, configState.offset());
assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
@ -346,6 +350,55 @@ public class KafkaConfigBackingStoreTest {
PowerMock.verifyAll();
}
@Test
public void testPutConnectorConfigWithTargetState() throws Exception {
expectConfigure();
expectStart(Collections.emptyList(), Collections.emptyMap());
// We expect to write the target state first, followed by the config write and then a read to end
expectConvertWriteRead(
TARGET_STATE_KEYS.get(0), KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATES_SERIALIZED.get(2),
"state.v2", TargetState.STOPPED.name());
// We don't expect the config update listener's onConnectorTargetStateChange hook to be invoked
expectConvertWriteRead(
CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
"properties", SAMPLE_CONFIGS.get(0));
configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
EasyMock.expectLastCall();
LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
recordsToRead.put(TARGET_STATE_KEYS.get(0), TARGET_STATES_SERIALIZED.get(2));
recordsToRead.put(CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
expectReadToEnd(recordsToRead);
expectPartitionCount(1);
expectStop();
PowerMock.replayAll();
configStorage.setupAndCreateKafkaBasedLog(TOPIC, config);
configStorage.start();
// Null before writing
ClusterConfigState configState = configStorage.snapshot();
assertEquals(-1, configState.offset());
assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
assertNull(configState.targetState(CONNECTOR_IDS.get(0)));
// Writing should block until it is written and read back from Kafka
configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), TargetState.STOPPED);
configState = configStorage.snapshot();
assertEquals(2, configState.offset());
assertEquals(TargetState.STOPPED, configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
configStorage.stop();
PowerMock.verifyAll();
}
@Test
public void testPutConnectorConfigProducerError() throws Exception {
expectConfigure();
@ -373,7 +426,8 @@ public class KafkaConfigBackingStoreTest {
assertEquals(0, configState.connectors().size());
// verify that the producer exception from KafkaBasedLog::send is propagated
ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)));
ConnectException e = assertThrows(ConnectException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(0),
SAMPLE_CONFIGS.get(0), null));
assertTrue(e.getMessage().contains("Error writing connector configuration to Kafka"));
configStorage.stop();
@ -505,16 +559,16 @@ public class KafkaConfigBackingStoreTest {
configStorage.putTaskCountRecord(CONNECTOR_IDS.get(0), 6);
// Should fail again when we get fenced out
assertThrows(PrivilegedWriteException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0)));
assertThrows(PrivilegedWriteException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null));
// Should fail if we retry without reclaiming write privileges
assertThrows(IllegalStateException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0)));
assertThrows(IllegalStateException.class, () -> configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null));
// Should succeed even without write privileges (target states can be written by anyone)
configStorage.putTargetState(CONNECTOR_IDS.get(1), TargetState.PAUSED);
// Should succeed if we re-claim write privileges
configStorage.claimWritePrivileges();
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0));
configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(0), null);
configStorage.stop();
@ -891,7 +945,6 @@ public class KafkaConfigBackingStoreTest {
expectRead(serializedAfterStartup, deserializedAfterStartup);
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(0));
configUpdateListener.onConnectorTargetStateChange(CONNECTOR_IDS.get(1));
EasyMock.expectLastCall();
expectPartitionCount(1);

View File

@ -40,6 +40,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -66,7 +67,7 @@ public class MemoryConfigBackingStoreTest {
@Test
public void testPutConnectorConfig() {
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), null);
ClusterConfigState configState = configStore.snapshot();
assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
@ -78,9 +79,24 @@ public class MemoryConfigBackingStoreTest {
verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
}
@Test
public void testPutConnectorConfigWithTargetState() {
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), TargetState.PAUSED);
ClusterConfigState configState = configStore.snapshot();
assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
assertEquals(TargetState.PAUSED, configState.targetState(CONNECTOR_IDS.get(0)));
assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
assertEquals(1, configState.connectors().size());
verify(configUpdateListener).onConnectorConfigUpdate(eq(CONNECTOR_IDS.get(0)));
// onConnectorTargetStateChange hook shouldn't be called when a connector is created with a specific initial target state
verify(configUpdateListener, never()).onConnectorTargetStateChange(eq(CONNECTOR_IDS.get(0)));
}
@Test
public void testPutConnectorConfigUpdateExisting() {
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), null);
ClusterConfigState configState = configStore.snapshot();
assertTrue(configState.contains(CONNECTOR_IDS.get(0)));
@ -89,7 +105,7 @@ public class MemoryConfigBackingStoreTest {
assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
assertEquals(1, configState.connectors().size());
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(1));
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(1), null);
configState = configStore.snapshot();
assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(0)));
@ -98,8 +114,8 @@ public class MemoryConfigBackingStoreTest {
@Test
public void testRemoveConnectorConfig() {
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
configStore.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), null);
configStore.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1), null);
ClusterConfigState configState = configStore.snapshot();
Set<String> expectedConnectors = new HashSet<>();
@ -124,7 +140,7 @@ public class MemoryConfigBackingStoreTest {
assertThrows(IllegalArgumentException.class,
() -> configStore.putTaskConfigs(CONNECTOR_IDS.get(0), Collections.singletonList(SAMPLE_CONFIGS.get(1))));
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), null);
configStore.putTaskConfigs(CONNECTOR_IDS.get(0), Collections.singletonList(SAMPLE_CONFIGS.get(1)));
ClusterConfigState configState = configStore.snapshot();
@ -151,7 +167,7 @@ public class MemoryConfigBackingStoreTest {
return null;
}).when(configUpdateListener).onTaskConfigUpdate(anySet());
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), null);
configStore.putTaskConfigs(CONNECTOR_IDS.get(0), Collections.singletonList(SAMPLE_CONFIGS.get(1)));
configStore.removeTaskConfigs(CONNECTOR_IDS.get(0));
ClusterConfigState configState = configStore.snapshot();
@ -171,7 +187,7 @@ public class MemoryConfigBackingStoreTest {
// Can't write target state for non-existent connector
assertThrows(IllegalArgumentException.class, () -> configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED));
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
configStore.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), null);
configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED);
// Ensure that ConfigBackingStore.UpdateListener::onConnectorTargetStateChange is called only once if the same state is written twice
configStore.putTargetState(CONNECTOR_IDS.get(0), TargetState.PAUSED);

View File

@ -30,6 +30,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@ -187,7 +188,7 @@ abstract class EmbeddedConnect {
*
* @param connName the name of the connector
* @param connConfig the intended configuration
* @throws ConnectRestException if the REST api returns error status
* @throws ConnectRestException if the REST API returns error status
* @throws ConnectException if the configuration fails to be serialized or if the request could not be sent
*/
public String configureConnector(String connName, Map<String, String> connConfig) {
@ -195,6 +196,36 @@ abstract class EmbeddedConnect {
return putConnectorConfig(url, connConfig);
}
/**
* Configure a new connector using the <strong><em>POST /connectors</em></strong> endpoint. If the connector already exists, a
* {@link ConnectRestException} will be thrown.
*
* @param createConnectorRequest the connector creation request
* @throws ConnectRestException if the REST API returns error status
* @throws ConnectException if the request could not be sent
*/
public String configureConnector(CreateConnectorRequest createConnectorRequest) {
String url = endpointForResource("connectors");
ObjectMapper objectMapper = new ObjectMapper();
String requestBody;
try {
requestBody = objectMapper.writeValueAsString(createConnectorRequest);
} catch (IOException e) {
throw new ConnectException("Failed to serialize connector creation request: " + createConnectorRequest);
}
Response response = requestPost(url, requestBody, Collections.emptyMap());
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
return responseToString(response);
} else {
throw new ConnectRestException(
response.getStatus(),
"Could not execute 'POST /connectors' request. Error response: " + responseToString(response)
);
}
}
/**
* Validate a given connector configuration. If the configuration validates or
* has a configuration error, an instance of {@link ConfigInfos} is returned. If the validation fails

View File

@ -41,7 +41,7 @@
<p>In standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command:</p>
<pre class="brush: bash;">
&gt; bin/connect-standalone.sh config/connect-standalone.properties [connector1.properties connector2.properties ...]</pre>
&gt; bin/connect-standalone.sh config/connect-standalone.properties [connector1.properties connector2.json ...]</pre>
<p>The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by <code>config/server.properties</code>. It will require tweaking to use with a different configuration or production deployment. All workers (both standalone and distributed) require a few configs:</p>
<ul>
@ -60,7 +60,7 @@
<p>Starting with 2.3.0, client configuration overrides can be configured individually per connector by using the prefixes <code>producer.override.</code> and <code>consumer.override.</code> for Kafka sources or Kafka sinks respectively. These overrides are included with the rest of the connector's configuration properties.</p>
<p>The remaining parameters are connector configuration files. You may include as many as you want, but all will execute within the same process (on different threads). You can also choose not to specify any connector configuration files on the command line, and instead use the REST API to create connectors at runtime after your standalone worker starts.</p>
<p>The remaining parameters are connector configuration files. Each file may either be a Java Properties file or a JSON file containing an object with the same structure as the request body of either the <code>POST /connectors</code> endpoint or the <code>PUT /connectors/{name}/config</code> endpoint (see the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a>). You may include as many as you want, but all will execute within the same process (on different threads). You can also choose not to specify any connector configuration files on the command line, and instead use the REST API to create connectors at runtime after your standalone worker starts.</p>
<p>Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data. Execution is very similar to standalone mode:</p>
@ -293,7 +293,7 @@ listeners=http://localhost:8080,https://localhost:8443</pre>
<ul>
<li><code>GET /connectors</code> - return a list of active connectors</li>
<li><code>POST /connectors</code> - create a new connector; the request body should be a JSON object containing a string <code>name</code> field and an object <code>config</code> field with the connector configuration parameters</li>
<li><code>POST /connectors</code> - create a new connector; the request body should be a JSON object containing a string <code>name</code> field and an object <code>config</code> field with the connector configuration parameters. The JSON object may also optionally contain a string <code>initial_state</code> field which can take the following values - <code>STOPPED</code>, <code>PAUSED</code> or <code>RUNNING</code> (the default value)</li>
<li><code>GET /connectors/{name}</code> - get information about a specific connector</li>
<li><code>GET /connectors/{name}/config</code> - get the configuration parameters for a specific connector</li>
<li><code>PUT /connectors/{name}/config</code> - update the configuration parameters for a specific connector</li>