diff --git a/build.gradle b/build.gradle index 6edcfdd330c..32141081572 100644 --- a/build.gradle +++ b/build.gradle @@ -874,8 +874,8 @@ project(':connect:runtime') { testCompile libs.junit testCompile libs.powermock testCompile libs.powermockEasymock + testCompile project(":connect:json") - testRuntime project(":connect:json") testRuntime libs.slf4jlog4j } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 9569b4beae0..30869a41d0b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -53,6 +53,14 @@ public class ConnectorConfig extends AbstractConfig { " or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter"; private static final String CONNECTOR_CLASS_DISPLAY = "Connector class"; + public static final String KEY_CONVERTER_CLASS_CONFIG = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG; + public static final String KEY_CONVERTER_CLASS_DOC = WorkerConfig.KEY_CONVERTER_CLASS_DOC; + public static final String KEY_CONVERTER_CLASS_DISPLAY = "Key converter class"; + + public static final String VALUE_CONVERTER_CLASS_CONFIG = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG; + public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC; + public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class"; + public static final String TASKS_MAX_CONFIG = "tasks.max"; private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector."; public static final int TASKS_MAX_DEFAULT = 1; @@ -64,7 +72,9 @@ public class ConnectorConfig extends AbstractConfig { return new ConfigDef() .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY) .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY) - .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY); + .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY) + .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY) + .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY); } public ConnectorConfig() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index a0664adff8f..d39806a9f6b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -66,8 +66,8 @@ public class Worker { private final Time time; private final String workerId; private final WorkerConfig config; - private final Converter keyConverter; - private final Converter valueConverter; + private final Converter defaultKeyConverter; + private final Converter defaultValueConverter; private final Converter internalKeyConverter; private final Converter internalValueConverter; private final OffsetBackingStore offsetBackingStore; @@ -85,10 +85,10 @@ public class Worker { this.workerId = workerId; this.time = time; this.config = config; - this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); - this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true); - this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); - this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false); + this.defaultKeyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); + this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true); + this.defaultValueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); + this.defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false); this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class); this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true); this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class); @@ -302,11 +302,13 @@ public class Worker { * Add a new task. * @param id Globally unique ID for this task. * @param taskConfig the parsed task configuration + * @param connConfig the parsed connector configuration * @param statusListener listener for notifications of task status changes * @param initialState the initial target state that the task should be initialized to */ public void startTask(ConnectorTaskId id, TaskConfig taskConfig, + ConnectorConfig connConfig, TaskStatus.Listener statusListener, TargetState initialState) { log.info("Creating task {}", id); @@ -322,7 +324,18 @@ public class Worker { final Task task = instantiateTask(taskClass); log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName()); - final WorkerTask workerTask = buildWorkerTask(id, task, statusListener, initialState); + Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); + if (keyConverter != null) + keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true); + else + keyConverter = defaultKeyConverter; + Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); + if (valueConverter != null) + valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false); + else + valueConverter = defaultValueConverter; + + final WorkerTask workerTask = buildWorkerTask(id, task, statusListener, initialState, keyConverter, valueConverter); // Start the task before adding modifying any state, any exceptions are caught higher up the // call chain and there's no cleanup to do here @@ -339,7 +352,9 @@ public class Worker { private WorkerTask buildWorkerTask(ConnectorTaskId id, Task task, TaskStatus.Listener statusListener, - TargetState initialState) { + TargetState initialState, + Converter keyConverter, + Converter valueConverter) { // Decide which type of worker task we need based on the type of task. if (task instanceof SourceTask) { OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index afabbeb1e37..6232187d4ae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -772,9 +772,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private void startTask(ConnectorTaskId taskId) { log.info("Starting task {}", taskId); TargetState initialState = configState.targetState(taskId.connector()); - Map configs = configState.taskConfig(taskId); - TaskConfig taskConfig = new TaskConfig(configs); - worker.startTask(taskId, taskConfig, this, initialState); + TaskConfig taskConfig = new TaskConfig(configState.taskConfig(taskId)); + ConnectorConfig connConfig = new ConnectorConfig(configState.connectorConfig(taskId.connector())); + worker.startTask(taskId, taskConfig, connConfig, this, initialState); } // Helper for starting a connector with the given name, which will extract & parse the config, generate connector diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java index 8014b3aa87d..5637e05a9c7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java @@ -29,7 +29,7 @@ public class StandaloneConfig extends WorkerConfig { * offset.storage.file.filename */ public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename"; - private static final String OFFSET_STORAGE_FILE_FILENAME_DOC = "file to store offset data in"; + private static final String OFFSET_STORAGE_FILE_FILENAME_DOC = "File to store offset data in"; static { CONFIG = baseConfigDef() diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 8dbda74b8e7..cac8d18f486 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -206,14 +206,16 @@ public class StandaloneHerder extends AbstractHerder { if (!configState.contains(taskId.connector())) cb.onCompletion(new NotFoundException("Connector " + taskId.connector() + " not found", null), null); - Map taskConfig = configState.taskConfig(taskId); - if (taskConfig == null) + Map taskConfigProps = configState.taskConfig(taskId); + if (taskConfigProps == null) cb.onCompletion(new NotFoundException("Task " + taskId + " not found", null), null); + TaskConfig taskConfig = new TaskConfig(taskConfigProps); + ConnectorConfig connConfig = new ConnectorConfig(configState.connectorConfig(taskId.connector())); TargetState targetState = configState.targetState(taskId.connector()); try { worker.stopAndAwaitTask(taskId); - worker.startTask(taskId, new TaskConfig(taskConfig), this, targetState); + worker.startTask(taskId, taskConfig, connConfig, this, targetState); cb.onCompletion(null, null); } catch (Exception e) { log.error("Failed to restart task {}", taskId, e); @@ -270,11 +272,14 @@ public class StandaloneHerder extends AbstractHerder { } private void createConnectorTasks(String connName, TargetState initialState) { + Map connConfigs = configState.connectorConfig(connName); + ConnectorConfig connConfig = new ConnectorConfig(connConfigs); + for (ConnectorTaskId taskId : configState.tasks(connName)) { Map taskConfigMap = configState.taskConfig(taskId); - TaskConfig config = new TaskConfig(taskConfigMap); + TaskConfig taskConfig = new TaskConfig(taskConfigMap); try { - worker.startTask(taskId, config, this, initialState); + worker.startTask(taskId, taskConfig, connConfig, this, initialState); } catch (Throwable e) { log.error("Failed to add task {}: ", taskId, e); // Swallow this so we can continue updating the rest of the tasks diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index ec4f0253c1e..f9839f5fb64 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -23,7 +23,10 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.source.SourceRecord; @@ -35,6 +38,7 @@ import org.apache.kafka.connect.storage.OffsetStorageWriter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.MockTime; import org.apache.kafka.connect.util.ThreadedTest; +import org.easymock.Capture; import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; @@ -352,8 +356,8 @@ public class WorkerTest extends ThreadedTest { EasyMock.eq(task), EasyMock.anyObject(TaskStatus.Listener.class), EasyMock.eq(TargetState.STARTED), - EasyMock.anyObject(Converter.class), - EasyMock.anyObject(Converter.class), + EasyMock.anyObject(JsonConverter.class), + EasyMock.anyObject(JsonConverter.class), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), @@ -380,7 +384,7 @@ public class WorkerTest extends ThreadedTest { worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore); worker.start(); assertEquals(Collections.emptySet(), worker.taskIds()); - worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener, TargetState.STARTED); + worker.startTask(TASK_ID, new TaskConfig(origProps), anyConnectorConfig(), taskStatusListener, TargetState.STARTED); assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds()); worker.stopAndAwaitTask(TASK_ID); assertEquals(Collections.emptySet(), worker.taskIds()); @@ -420,8 +424,8 @@ public class WorkerTest extends ThreadedTest { EasyMock.eq(task), EasyMock.anyObject(TaskStatus.Listener.class), EasyMock.eq(TargetState.STARTED), - EasyMock.anyObject(Converter.class), - EasyMock.anyObject(Converter.class), + EasyMock.anyObject(JsonConverter.class), + EasyMock.anyObject(JsonConverter.class), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageWriter.class), @@ -449,12 +453,79 @@ public class WorkerTest extends ThreadedTest { worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore); worker.start(); - worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener, TargetState.STARTED); + worker.startTask(TASK_ID, new TaskConfig(origProps), anyConnectorConfig(), taskStatusListener, TargetState.STARTED); worker.stop(); PowerMock.verifyAll(); } + @Test + public void testConverterOverrides() throws Exception { + expectStartStorage(); + + TestSourceTask task = PowerMock.createMock(TestSourceTask.class); + WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); + EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID); + + PowerMock.mockStaticPartial(Worker.class, "instantiateTask"); + PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task); + EasyMock.expect(task.version()).andReturn("1.0"); + + Capture keyConverter = EasyMock.newCapture(); + Capture valueConverter = EasyMock.newCapture(); + + PowerMock.expectNew( + WorkerSourceTask.class, EasyMock.eq(TASK_ID), + EasyMock.eq(task), + EasyMock.anyObject(TaskStatus.Listener.class), + EasyMock.eq(TargetState.STARTED), + EasyMock.capture(keyConverter), + EasyMock.capture(valueConverter), + EasyMock.anyObject(KafkaProducer.class), + EasyMock.anyObject(OffsetStorageReader.class), + EasyMock.anyObject(OffsetStorageWriter.class), + EasyMock.anyObject(WorkerConfig.class), + EasyMock.anyObject(Time.class)) + .andReturn(workerTask); + Map origProps = new HashMap<>(); + origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); + workerTask.initialize(new TaskConfig(origProps)); + EasyMock.expectLastCall(); + workerTask.run(); + EasyMock.expectLastCall(); + + // Remove + workerTask.stop(); + EasyMock.expectLastCall(); + EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); + EasyMock.expectLastCall(); + + expectStopStorage(); + + PowerMock.replayAll(); + + worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore); + worker.start(); + assertEquals(Collections.emptySet(), worker.taskIds()); + Map connProps = anyConnectorConfigMap(); + connProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); + connProps.put("key.converter.extra.config", "foo"); + connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName()); + connProps.put("value.converter.extra.config", "bar"); + worker.startTask(TASK_ID, new TaskConfig(origProps), new ConnectorConfig(connProps), taskStatusListener, TargetState.STARTED); + assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds()); + worker.stopAndAwaitTask(TASK_ID); + assertEquals(Collections.emptySet(), worker.taskIds()); + // Nothing should be left, so this should effectively be a nop + worker.stop(); + + // Validate extra configs got passed through to overridden converters + assertEquals("foo", keyConverter.getValue().configs.get("extra.config")); + assertEquals("bar", valueConverter.getValue().configs.get("extra.config")); + + PowerMock.verifyAll(); + } + private void expectStartStorage() { offsetBackingStore.configure(EasyMock.anyObject(WorkerConfig.class)); EasyMock.expectLastCall(); @@ -467,6 +538,17 @@ public class WorkerTest extends ThreadedTest { EasyMock.expectLastCall(); } + private Map anyConnectorConfigMap() { + Map props = new HashMap<>(); + props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID); + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName()); + props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); + return props; + } + + private ConnectorConfig anyConnectorConfig() { + return new ConnectorConfig(anyConnectorConfigMap()); + } /* Name here needs to be unique as we are testing the aliasing mechanism */ public static class WorkerTestConnector extends Connector { @@ -527,4 +609,23 @@ public class WorkerTest extends ThreadedTest { public void stop() { } } + + public static class TestConverter implements Converter { + public Map configs; + + @Override + public void configure(Map configs, boolean isKey) { + this.configs = configs; + } + + @Override + public byte[] fromConnectData(String topic, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(String topic, byte[] value) { + return null; + } + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 747db1af53d..8fc6dbd3562 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -175,7 +175,7 @@ public class DistributedHerderTest { EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall(); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -198,7 +198,7 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall(); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -234,7 +234,7 @@ public class DistributedHerderTest { PowerMock.expectLastCall(); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall(); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -530,7 +530,7 @@ public class DistributedHerderTest { expectPostRebalanceCatchup(SNAPSHOT); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); - worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall(); // now handle the task restart @@ -545,7 +545,7 @@ public class DistributedHerderTest { worker.stopAndAwaitTask(TASK0); PowerMock.expectLastCall(); - worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall(); PowerMock.replayAll(); @@ -841,7 +841,7 @@ public class DistributedHerderTest { // join expectRebalance(1, Collections.emptyList(), Collections.singletonList(TASK0)); expectPostRebalanceCatchup(SNAPSHOT); - worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall(); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -877,7 +877,7 @@ public class DistributedHerderTest { // join expectRebalance(1, Collections.emptyList(), Collections.singletonList(TASK0)); expectPostRebalanceCatchup(SNAPSHOT); - worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall(); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -918,7 +918,7 @@ public class DistributedHerderTest { // join expectRebalance(1, Collections.emptyList(), Collections.singletonList(TASK0)); expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1); - worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED)); + worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED)); PowerMock.expectLastCall(); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -970,7 +970,7 @@ public class DistributedHerderTest { expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, Collections.emptyList(), Arrays.asList(TASK0)); - worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + worker.startTask(EasyMock.eq(TASK0), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall(); member.poll(EasyMock.anyInt()); PowerMock.expectLastCall(); @@ -1008,7 +1008,7 @@ public class DistributedHerderTest { EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall(); EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); - worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); + worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); PowerMock.expectLastCall(); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); member.poll(EasyMock.anyInt()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index e70b968d420..3772586bbe5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -73,6 +73,10 @@ public class StandaloneHerderTest { private static final int DEFAULT_MAX_TASKS = 1; private static final String WORKER_ID = "localhost:8083"; + private enum SourceSink { + SOURCE, SINK + }; + private StandaloneHerder herder; private Connector connector; @@ -88,11 +92,11 @@ public class StandaloneHerderTest { @Test public void testCreateSourceConnector() throws Exception { connector = PowerMock.createMock(BogusSourceConnector.class); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); PowerMock.verifyAll(); } @@ -101,7 +105,7 @@ public class StandaloneHerderTest { public void testCreateConnectorAlreadyExists() throws Exception { connector = PowerMock.createMock(BogusSourceConnector.class); // First addition should succeed - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); // Second should fail createCallback.onCompletion(EasyMock.anyObject(), EasyMock.>isNull()); @@ -109,8 +113,8 @@ public class StandaloneHerderTest { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); PowerMock.verifyAll(); } @@ -118,11 +122,11 @@ public class StandaloneHerderTest { @Test public void testCreateSinkConnector() throws Exception { connector = PowerMock.createMock(BogusSinkConnector.class); - expectAdd(CONNECTOR_NAME, BogusSinkConnector.class, BogusSinkTask.class, true); + expectAdd(SourceSink.SINK); PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class, true), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SINK), false, createCallback); PowerMock.verifyAll(); } @@ -130,7 +134,7 @@ public class StandaloneHerderTest { @Test public void testDestroyConnector() throws Exception { connector = PowerMock.createMock(BogusSourceConnector.class); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.emptyList()); statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0)); @@ -139,7 +143,7 @@ public class StandaloneHerderTest { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); FutureCallback> futureCb = new FutureCallback<>(); herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb); futureCb.get(1000L, TimeUnit.MILLISECONDS); @@ -159,18 +163,18 @@ public class StandaloneHerderTest { @Test public void testRestartConnector() throws Exception { - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); worker.stopConnector(CONNECTOR_NAME); EasyMock.expectLastCall(); - worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))), + worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(SourceSink.SOURCE))), EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.expectLastCall(); PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartConnector(CONNECTOR_NAME, cb); @@ -181,7 +185,7 @@ public class StandaloneHerderTest { @Test public void testRestartConnectorFailureOnStop() throws Exception { - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); RuntimeException e = new RuntimeException(); worker.stopConnector(CONNECTOR_NAME); @@ -191,7 +195,7 @@ public class StandaloneHerderTest { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartConnector(CONNECTOR_NAME, cb); @@ -207,19 +211,19 @@ public class StandaloneHerderTest { @Test public void testRestartConnectorFailureOnStart() throws Exception { - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); worker.stopConnector(CONNECTOR_NAME); EasyMock.expectLastCall(); RuntimeException e = new RuntimeException(); - worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))), + worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(SourceSink.SOURCE))), EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.expectLastCall().andThrow(e); PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartConnector(CONNECTOR_NAME, cb); @@ -236,18 +240,19 @@ public class StandaloneHerderTest { @Test public void testRestartTask() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); worker.stopAndAwaitTask(taskId); EasyMock.expectLastCall(); - Map generatedTaskProps = taskConfig(BogusSourceTask.class, false); - worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder, TargetState.STARTED); + ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(SourceSink.SOURCE)); + TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE)); + worker.startTask(taskId, taskConfig, connConfig, herder, TargetState.STARTED); EasyMock.expectLastCall(); PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartTask(taskId, cb); @@ -259,7 +264,7 @@ public class StandaloneHerderTest { @Test public void testRestartTaskFailureOnStop() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); RuntimeException e = new RuntimeException(); worker.stopAndAwaitTask(taskId); @@ -269,7 +274,7 @@ public class StandaloneHerderTest { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartTask(taskId, cb); @@ -285,19 +290,20 @@ public class StandaloneHerderTest { @Test public void testRestartTaskFailureOnStart() throws Exception { ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); worker.stopAndAwaitTask(taskId); EasyMock.expectLastCall(); RuntimeException e = new RuntimeException(); - Map generatedTaskProps = taskConfig(BogusSourceTask.class, false); - worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder, TargetState.STARTED); + ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(SourceSink.SOURCE)); + TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE)); + worker.startTask(taskId, taskConfig, connConfig, herder, TargetState.STARTED); EasyMock.expectLastCall().andThrow(e); PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); FutureCallback cb = new FutureCallback<>(); herder.restartTask(taskId, cb); @@ -314,7 +320,7 @@ public class StandaloneHerderTest { @Test public void testCreateAndStop() throws Exception { connector = PowerMock.createMock(BogusSourceConnector.class); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked expectStop(); @@ -325,7 +331,7 @@ public class StandaloneHerderTest { PowerMock.replayAll(); - herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback); + herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback); herder.stop(); PowerMock.verifyAll(); @@ -333,7 +339,7 @@ public class StandaloneHerderTest { @Test public void testAccessors() throws Exception { - Map connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false); + Map connConfig = connectorConfig(SourceSink.SOURCE); Callback> listConnectorsCb = PowerMock.createMock(Callback.class); Callback connectorInfoCb = PowerMock.createMock(Callback.class); @@ -353,7 +359,7 @@ public class StandaloneHerderTest { // Create connector connector = PowerMock.createMock(BogusSourceConnector.class); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); // Validate accessors with 1 connector listConnectorsCb.onCompletion(null, Collections.singleton(CONNECTOR_NAME)); @@ -364,7 +370,7 @@ public class StandaloneHerderTest { connectorConfigCb.onCompletion(null, connConfig); EasyMock.expectLastCall(); - TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(BogusSourceTask.class, false)); + TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)); taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo)); EasyMock.expectLastCall(); @@ -388,7 +394,7 @@ public class StandaloneHerderTest { @Test public void testPutConnectorConfig() throws Exception { - Map connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false); + Map connConfig = connectorConfig(SourceSink.SOURCE); Map newConnConfig = new HashMap<>(connConfig); newConnConfig.put("foo", "bar"); @@ -397,7 +403,7 @@ public class StandaloneHerderTest { // Create connector = PowerMock.createMock(BogusSourceConnector.class); - expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false); + expectAdd(SourceSink.SOURCE); // Should get first config connectorConfigCb.onCompletion(null, connConfig); EasyMock.expectLastCall(); @@ -411,7 +417,7 @@ public class StandaloneHerderTest { EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true); // Generate same task config, which should result in no additional action to restart tasks EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null)) - .andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false))); + .andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE))); worker.isSinkConnector(CONNECTOR_NAME); EasyMock.expectLastCall().andReturn(false); ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0))); @@ -446,17 +452,14 @@ public class StandaloneHerderTest { PowerMock.verifyAll(); } - private void expectAdd(String name, - Class connClass, - Class taskClass, - boolean sink) throws Exception { + private void expectAdd(SourceSink sourceSink) throws Exception { - Map connectorProps = connectorConfig(name, connClass, sink); + Map connectorProps = connectorConfig(sourceSink); worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.expectLastCall(); - EasyMock.expect(worker.isRunning(name)).andReturn(true); + EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true); ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0))); createCallback.onCompletion(null, new Herder.Created<>(true, connInfo)); @@ -464,16 +467,18 @@ public class StandaloneHerderTest { // And we should instantiate the tasks. For a sink task, we should see added properties for // the input topic partitions - Map generatedTaskProps = taskConfig(taskClass, sink); + ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(sourceSink)); + Map generatedTaskProps = taskConfig(sourceSink); + TaskConfig taskConfig = new TaskConfig(generatedTaskProps); - EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sink ? TOPICS_LIST : null)) + EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null)) .andReturn(Collections.singletonList(generatedTaskProps)); - worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder, TargetState.STARTED); + worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig, connConfig, herder, TargetState.STARTED); EasyMock.expectLastCall(); worker.isSinkConnector(CONNECTOR_NAME); - PowerMock.expectLastCall().andReturn(sink); + PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK); } private void expectStop() { @@ -490,22 +495,24 @@ public class StandaloneHerderTest { expectStop(); } - private static HashMap connectorConfig(String name, Class connClass, boolean sink) { - HashMap connectorProps = new HashMap<>(); - connectorProps.put(ConnectorConfig.NAME_CONFIG, name); - connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName()); - if (sink) { - connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR); - } - return connectorProps; + private static Map connectorConfig(SourceSink sourceSink) { + Map props = new HashMap<>(); + props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); + Class connectorClass = sourceSink == SourceSink.SINK ? BogusSinkConnector.class : BogusSourceConnector.class; + props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName()); + props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1"); + if (sourceSink == SourceSink.SINK) + props.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR); + return props; } - private static Map taskConfig(Class taskClass, boolean sink) { + private static Map taskConfig(SourceSink sourceSink) { HashMap generatedTaskProps = new HashMap<>(); // Connectors can add any settings, so these are arbitrary generatedTaskProps.put("foo", "bar"); + Class taskClass = sourceSink == SourceSink.SINK ? BogusSinkTask.class : BogusSourceTask.class; generatedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, taskClass.getName()); - if (sink) + if (sourceSink == SourceSink.SINK) generatedTaskProps.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR); return generatedTaskProps; } diff --git a/docs/connect.html b/docs/connect.html index e5a4ad2b55c..de3b5aaae23 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -43,7 +43,17 @@ In standalone mode all work is performed in a single process. This configuration > bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...] -The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by config/server.properties. It will require tweaking to use with a different configuration or production deployment. +The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by config/server.properties. It will require tweaking to use with a different configuration or production deployment. All workers (both standalone and distributed) require a few configs: +
    +
  • bootstrap.servers - List of Kafka servers used to bootstrap connections to Kafka
  • +
  • key.converter - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
  • +
  • value.converter - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
  • +
+ +The important configuration options specific to standalone mode are: +
    +
  • offset.storage.file.filename - File to store offset data in
  • +
The remaining parameters are connector configuration files. You may include as many as you want, but all will execute within the same process (on different threads). @@ -55,7 +65,7 @@ Distributed mode handles automatic balancing of work, allows you to scale up (or The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets and task statues. In the distributed mode, Kafka Connect stores the offsets, configs and task statuses in Kafka topics. It is recommended to manually create the topics for offset, configs and statuses in order to achieve the desired the number of partitions and replication factors. If the topics are not yet created when starting Kafka Connect, the topics will be auto created with default number of partitions and replication factor, which may not be best suited for its usage. -In particular, the following configuration parameters are critical to set before starting your cluster: +In particular, the following configuration parameters, in addition to the common settings mentioned above, are critical to set before starting your cluster:
  • group.id (default connect-cluster) - unique name for the cluster, used in forming the Connect cluster group; note that this must not conflict with consumer group IDs
  • config.storage.topic (default connect-configs) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic. You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
  • @@ -76,6 +86,8 @@ Most configurations are connector dependent, so they can't be outlined here. How
  • name - Unique name for the connector. Attempting to register again with the same name will fail.
  • connector.class - The Java class for the connector
  • tasks.max - The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
  • +
  • key.converter - (optional) Override the default key converter set by the worker.
  • +
  • value.converter - (optional) Override the default value converter set by the worker.
The connector.class config supports several formats: the full name or alias of the class for this connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name or use FileStreamSink or FileStreamSinkConnector to make the configuration a bit shorter. diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py index 0b004996caf..70bc32c9d5b 100644 --- a/tests/kafkatest/tests/connect/connect_rest_test.py +++ b/tests/kafkatest/tests/connect/connect_rest_test.py @@ -29,8 +29,8 @@ class ConnectRestApiTest(KafkaTest): FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector' FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector' - FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topic', 'file'} - FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topics', 'file'} + FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter', 'topic', 'file'} + FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter', 'topics', 'file'} INPUT_FILE = "/mnt/connect.input" INPUT_FILE2 = "/mnt/connect.input2" diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py index 91843900e73..93f57344fb9 100644 --- a/tests/kafkatest/tests/connect/connect_test.py +++ b/tests/kafkatest/tests/connect/connect_test.py @@ -63,10 +63,17 @@ class ConnectStandaloneFileTest(Test): @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None) @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL]) def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'): + """ + Validates basic end-to-end functionality of Connect standalone using the file source and sink converters. Includes + parameterizations to test different converters (which also test per-connector converter overrides), schema/schemaless + modes, and security support. + """ assert converter != None, "converter type must be set" - # Template parameters - self.key_converter = converter - self.value_converter = converter + # Template parameters. Note that we don't set key/value.converter. These default to JsonConverter and we validate + # converter overrides via the connector configuration. + if converter != "org.apache.kafka.connect.json.JsonConverter": + self.override_key_converter = converter + self.override_value_converter = converter self.schemas = schemas self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, diff --git a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties index 216dab55592..bff002bbdb0 100644 --- a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties +++ b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties @@ -17,4 +17,12 @@ name=local-file-sink connector.class={{ FILE_SINK_CONNECTOR }} tasks.max=1 file={{ OUTPUT_FILE }} -topics={{ TOPIC }} \ No newline at end of file +topics={{ TOPIC }} + +# For testing per-connector converters +{% if override_key_converter is defined %} +key.converter={{ override_key_converter }} +{% endif %} +{% if override_key_converter is defined %} +value.converter={{ override_value_converter }} +{% endif %} \ No newline at end of file diff --git a/tests/kafkatest/tests/connect/templates/connect-file-source.properties b/tests/kafkatest/tests/connect/templates/connect-file-source.properties index bff9720b8bf..800d6a0751b 100644 --- a/tests/kafkatest/tests/connect/templates/connect-file-source.properties +++ b/tests/kafkatest/tests/connect/templates/connect-file-source.properties @@ -17,4 +17,12 @@ name=local-file-source connector.class={{ FILE_SOURCE_CONNECTOR }} tasks.max=1 file={{ INPUT_FILE }} -topic={{ TOPIC }} \ No newline at end of file +topic={{ TOPIC }} + +# For testing per-connector converters +{% if override_key_converter is defined %} +key.converter={{ override_key_converter }} +{% endif %} +{% if override_key_converter is defined %} +value.converter={{ override_value_converter }} +{% endif %}