mirror of https://github.com/apache/kafka.git
KAFKA-2422: Allow copycat connector plugins to be aliased to simpler names
…names Author: Gwen Shapira <cshapi@gmail.com> Reviewers: Ewen Cheslack-Postava <ewen@confluent.io> Closes #687 from gwenshap/KAFKA-2422
This commit is contained in:
parent
f9642e2a98
commit
b93f48f749
|
@ -36,6 +36,7 @@ def powermock_easymock='org.powermock:powermock-api-easymock:1.6.3'
|
||||||
def jackson_version = '2.6.3'
|
def jackson_version = '2.6.3'
|
||||||
def jetty_version = '9.2.14.v20151106'
|
def jetty_version = '9.2.14.v20151106'
|
||||||
def jersey_version = '2.22.1'
|
def jersey_version = '2.22.1'
|
||||||
|
def reflections_version = '0.9.10'
|
||||||
|
|
||||||
allprojects {
|
allprojects {
|
||||||
apply plugin: 'idea'
|
apply plugin: 'idea'
|
||||||
|
@ -790,6 +791,7 @@ project(':connect:runtime') {
|
||||||
compile "org.eclipse.jetty:jetty-servlet:$jetty_version"
|
compile "org.eclipse.jetty:jetty-servlet:$jetty_version"
|
||||||
compile "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$jackson_version"
|
compile "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$jackson_version"
|
||||||
compile "org.glassfish.jersey.containers:jersey-container-servlet:$jersey_version"
|
compile "org.glassfish.jersey.containers:jersey-container-servlet:$jersey_version"
|
||||||
|
compile "org.reflections:reflections:$reflections_version"
|
||||||
|
|
||||||
testCompile "$junit"
|
testCompile "$junit"
|
||||||
testCompile "$easymock"
|
testCompile "$easymock"
|
||||||
|
|
|
@ -171,6 +171,8 @@
|
||||||
|
|
||||||
<subpackage name="runtime">
|
<subpackage name="runtime">
|
||||||
<allow pkg="org.apache.kafka.connect" />
|
<allow pkg="org.apache.kafka.connect" />
|
||||||
|
<allow pkg="org.reflections"/>
|
||||||
|
<allow pkg="org.reflections.util"/>
|
||||||
|
|
||||||
<subpackage name="rest">
|
<subpackage name="rest">
|
||||||
<allow pkg="org.eclipse.jetty" />
|
<allow pkg="org.eclipse.jetty" />
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
name=local-file-sink
|
name=local-file-sink
|
||||||
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
|
connector.class=FileStreamSink
|
||||||
tasks.max=1
|
tasks.max=1
|
||||||
file=test.sink.txt
|
file=test.sink.txt
|
||||||
topics=connect-test
|
topics=connect-test
|
|
@ -14,7 +14,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
name=local-file-source
|
name=local-file-source
|
||||||
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
|
connector.class=FileStreamSource
|
||||||
tasks.max=1
|
tasks.max=1
|
||||||
file=test.txt
|
file=test.txt
|
||||||
topic=connect-test
|
topic=connect-test
|
|
@ -43,7 +43,9 @@ public class ConnectorConfig extends AbstractConfig {
|
||||||
|
|
||||||
public static final String CONNECTOR_CLASS_CONFIG = "connector.class";
|
public static final String CONNECTOR_CLASS_CONFIG = "connector.class";
|
||||||
private static final String CONNECTOR_CLASS_DOC =
|
private static final String CONNECTOR_CLASS_DOC =
|
||||||
"Name of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector";
|
"Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. " +
|
||||||
|
"If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, " +
|
||||||
|
" or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter";
|
||||||
|
|
||||||
public static final String TASKS_MAX_CONFIG = "tasks.max";
|
public static final String TASKS_MAX_CONFIG = "tasks.max";
|
||||||
private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
|
private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
|
||||||
|
@ -58,7 +60,7 @@ public class ConnectorConfig extends AbstractConfig {
|
||||||
static {
|
static {
|
||||||
config = new ConfigDef()
|
config = new ConfigDef()
|
||||||
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
|
.define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC)
|
||||||
.define(CONNECTOR_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONNECTOR_CLASS_DOC)
|
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC)
|
||||||
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
|
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC)
|
||||||
.define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
|
.define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC);
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.connect.connector.Connector;
|
||||||
import org.apache.kafka.connect.connector.ConnectorContext;
|
import org.apache.kafka.connect.connector.ConnectorContext;
|
||||||
import org.apache.kafka.connect.connector.Task;
|
import org.apache.kafka.connect.connector.Task;
|
||||||
import org.apache.kafka.connect.errors.ConnectException;
|
import org.apache.kafka.connect.errors.ConnectException;
|
||||||
|
import org.apache.kafka.connect.sink.SinkConnector;
|
||||||
import org.apache.kafka.connect.sink.SinkTask;
|
import org.apache.kafka.connect.sink.SinkTask;
|
||||||
import org.apache.kafka.connect.source.SourceTask;
|
import org.apache.kafka.connect.source.SourceTask;
|
||||||
import org.apache.kafka.connect.storage.Converter;
|
import org.apache.kafka.connect.storage.Converter;
|
||||||
|
@ -35,15 +36,20 @@ import org.apache.kafka.connect.storage.OffsetStorageReader;
|
||||||
import org.apache.kafka.connect.storage.OffsetStorageWriter;
|
import org.apache.kafka.connect.storage.OffsetStorageWriter;
|
||||||
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
|
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
|
||||||
import org.apache.kafka.connect.util.ConnectorTaskId;
|
import org.apache.kafka.connect.util.ConnectorTaskId;
|
||||||
|
import org.reflections.Reflections;
|
||||||
|
import org.reflections.util.ClasspathHelper;
|
||||||
|
import org.reflections.util.ConfigurationBuilder;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving
|
* Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving
|
||||||
|
@ -170,15 +176,9 @@ public class Worker {
|
||||||
*/
|
*/
|
||||||
public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
|
public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
|
||||||
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
|
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
|
||||||
Class<?> maybeConnClass = connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
|
Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
|
||||||
log.info("Creating connector {} of type {}", connName, maybeConnClass.getName());
|
|
||||||
|
|
||||||
Class<? extends Connector> connClass;
|
log.info("Creating connector {} of type {}", connName, connClass.getName());
|
||||||
try {
|
|
||||||
connClass = maybeConnClass.asSubclass(Connector.class);
|
|
||||||
} catch (ClassCastException e) {
|
|
||||||
throw new ConnectException("Specified class is not a subclass of Connector: " + maybeConnClass.getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (connectors.containsKey(connName))
|
if (connectors.containsKey(connName))
|
||||||
throw new ConnectException("Connector with name " + connName + " already exists");
|
throw new ConnectException("Connector with name " + connName + " already exists");
|
||||||
|
@ -197,6 +197,54 @@ public class Worker {
|
||||||
log.info("Finished creating connector {}", connName);
|
log.info("Finished creating connector {}", connName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
|
||||||
|
public boolean isSinkConnector(String connName) {
|
||||||
|
return SinkConnector.class.isAssignableFrom(connectors.get(connName).getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration
|
||||||
|
private Class<? extends Connector> getConnectorClass(String connectorAlias) {
|
||||||
|
Reflections reflections = new Reflections(new ConfigurationBuilder()
|
||||||
|
.setUrls(ClasspathHelper.forJavaClassPath()));
|
||||||
|
|
||||||
|
Set<Class<? extends Connector>> connectors = reflections.getSubTypesOf(Connector.class);
|
||||||
|
|
||||||
|
List<Class<? extends Connector>> results = new ArrayList<>();
|
||||||
|
|
||||||
|
for (Class<? extends Connector> connector: connectors) {
|
||||||
|
// Configuration included the fully qualified class name
|
||||||
|
if (connector.getName().equals(connectorAlias))
|
||||||
|
results.add(connector);
|
||||||
|
|
||||||
|
// Configuration included the class name but not package
|
||||||
|
if (connector.getSimpleName().equals(connectorAlias))
|
||||||
|
results.add(connector);
|
||||||
|
|
||||||
|
// Configuration included a short version of the name (i.e. FileStreamSink instead of FileStreamSinkConnector)
|
||||||
|
if (connector.getSimpleName().equals(connectorAlias + "Connector"))
|
||||||
|
results.add(connector);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (results.isEmpty())
|
||||||
|
throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorAlias + " available connectors are: " + connectorNames(connectors));
|
||||||
|
if (results.size() > 1) {
|
||||||
|
throw new ConnectException("More than one connector matches alias " + connectorAlias + ". Please use full package + class name instead. Classes found: " + connectorNames(results));
|
||||||
|
}
|
||||||
|
|
||||||
|
// We just validated that we have exactly one result, so this is safe
|
||||||
|
return results.get(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String connectorNames(Collection<Class<? extends Connector>> connectors) {
|
||||||
|
StringBuilder names = new StringBuilder();
|
||||||
|
for (Class<?> c : connectors)
|
||||||
|
names.append(c.getName()).append(", ");
|
||||||
|
|
||||||
|
return names.substring(0, names.toString().length() - 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private static Connector instantiateConnector(Class<? extends Connector> connClass) {
|
private static Connector instantiateConnector(Class<? extends Connector> connClass) {
|
||||||
try {
|
try {
|
||||||
return Utils.newInstance(connClass);
|
return Utils.newInstance(connClass);
|
||||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.kafka.connect.runtime.Worker;
|
||||||
import org.apache.kafka.connect.runtime.rest.RestServer;
|
import org.apache.kafka.connect.runtime.rest.RestServer;
|
||||||
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
|
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
|
||||||
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
|
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
|
||||||
import org.apache.kafka.connect.sink.SinkConnector;
|
|
||||||
import org.apache.kafka.connect.storage.KafkaConfigStorage;
|
import org.apache.kafka.connect.storage.KafkaConfigStorage;
|
||||||
import org.apache.kafka.connect.util.Callback;
|
import org.apache.kafka.connect.util.Callback;
|
||||||
import org.apache.kafka.connect.util.ConnectorTaskId;
|
import org.apache.kafka.connect.util.ConnectorTaskId;
|
||||||
|
@ -709,9 +708,8 @@ public class DistributedHerder implements Herder, Runnable {
|
||||||
try {
|
try {
|
||||||
Map<String, String> configs = configState.connectorConfig(connName);
|
Map<String, String> configs = configState.connectorConfig(connName);
|
||||||
ConnectorConfig connConfig = new ConnectorConfig(configs);
|
ConnectorConfig connConfig = new ConnectorConfig(configs);
|
||||||
|
|
||||||
List<String> sinkTopics = null;
|
List<String> sinkTopics = null;
|
||||||
if (SinkConnector.class.isAssignableFrom(connConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))
|
if (worker.isSinkConnector(connName))
|
||||||
sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
|
sinkTopics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG);
|
||||||
|
|
||||||
final List<Map<String, String>> taskProps
|
final List<Map<String, String>> taskProps
|
||||||
|
|
|
@ -91,14 +91,14 @@ public class WorkerTest extends ThreadedTest {
|
||||||
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
|
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
|
||||||
|
|
||||||
PowerMock.mockStatic(Worker.class);
|
PowerMock.mockStatic(Worker.class);
|
||||||
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
|
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
|
||||||
EasyMock.expect(connector.version()).andReturn("1.0");
|
EasyMock.expect(connector.version()).andReturn("1.0");
|
||||||
|
|
||||||
Map<String, String> props = new HashMap<>();
|
Map<String, String> props = new HashMap<>();
|
||||||
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
|
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
|
||||||
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
|
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
|
||||||
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
|
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
|
||||||
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
|
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
|
||||||
|
|
||||||
connector.initialize(ctx);
|
connector.initialize(ctx);
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
|
@ -135,6 +135,110 @@ public class WorkerTest extends ThreadedTest {
|
||||||
PowerMock.verifyAll();
|
PowerMock.verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddConnectorByAlias() throws Exception {
|
||||||
|
offsetBackingStore.configure(EasyMock.anyObject(Map.class));
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
offsetBackingStore.start();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
// Create
|
||||||
|
Connector connector = PowerMock.createMock(Connector.class);
|
||||||
|
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
|
||||||
|
|
||||||
|
PowerMock.mockStatic(Worker.class);
|
||||||
|
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
|
||||||
|
EasyMock.expect(connector.version()).andReturn("1.0");
|
||||||
|
|
||||||
|
Map<String, String> props = new HashMap<>();
|
||||||
|
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
|
||||||
|
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
|
||||||
|
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
|
||||||
|
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
|
||||||
|
|
||||||
|
connector.initialize(ctx);
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
connector.start(props);
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
// Remove
|
||||||
|
connector.stop();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
offsetBackingStore.stop();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
PowerMock.replayAll();
|
||||||
|
|
||||||
|
|
||||||
|
worker = new Worker(new MockTime(), config, offsetBackingStore);
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
ConnectorConfig config = new ConnectorConfig(props);
|
||||||
|
assertEquals(Collections.emptySet(), worker.connectorNames());
|
||||||
|
worker.addConnector(config, ctx);
|
||||||
|
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
|
||||||
|
|
||||||
|
worker.stopConnector(CONNECTOR_ID);
|
||||||
|
assertEquals(Collections.emptySet(), worker.connectorNames());
|
||||||
|
// Nothing should be left, so this should effectively be a nop
|
||||||
|
worker.stop();
|
||||||
|
|
||||||
|
PowerMock.verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddConnectorByShortAlias() throws Exception {
|
||||||
|
offsetBackingStore.configure(EasyMock.anyObject(Map.class));
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
offsetBackingStore.start();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
// Create
|
||||||
|
Connector connector = PowerMock.createMock(Connector.class);
|
||||||
|
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
|
||||||
|
|
||||||
|
PowerMock.mockStatic(Worker.class);
|
||||||
|
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
|
||||||
|
EasyMock.expect(connector.version()).andReturn("1.0");
|
||||||
|
|
||||||
|
Map<String, String> props = new HashMap<>();
|
||||||
|
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
|
||||||
|
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
|
||||||
|
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
|
||||||
|
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
|
||||||
|
|
||||||
|
connector.initialize(ctx);
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
connector.start(props);
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
// Remove
|
||||||
|
connector.stop();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
offsetBackingStore.stop();
|
||||||
|
EasyMock.expectLastCall();
|
||||||
|
|
||||||
|
PowerMock.replayAll();
|
||||||
|
|
||||||
|
worker = new Worker(new MockTime(), config, offsetBackingStore);
|
||||||
|
worker.start();
|
||||||
|
|
||||||
|
ConnectorConfig config = new ConnectorConfig(props);
|
||||||
|
assertEquals(Collections.emptySet(), worker.connectorNames());
|
||||||
|
worker.addConnector(config, ctx);
|
||||||
|
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
|
||||||
|
|
||||||
|
worker.stopConnector(CONNECTOR_ID);
|
||||||
|
assertEquals(Collections.emptySet(), worker.connectorNames());
|
||||||
|
// Nothing should be left, so this should effectively be a nop
|
||||||
|
worker.stop();
|
||||||
|
|
||||||
|
PowerMock.verifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test(expected = ConnectException.class)
|
@Test(expected = ConnectException.class)
|
||||||
public void testStopInvalidConnector() {
|
public void testStopInvalidConnector() {
|
||||||
offsetBackingStore.configure(EasyMock.anyObject(Map.class));
|
offsetBackingStore.configure(EasyMock.anyObject(Map.class));
|
||||||
|
@ -162,14 +266,14 @@ public class WorkerTest extends ThreadedTest {
|
||||||
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
|
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
|
||||||
|
|
||||||
PowerMock.mockStatic(Worker.class);
|
PowerMock.mockStatic(Worker.class);
|
||||||
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{TestConnector.class}).andReturn(connector);
|
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
|
||||||
EasyMock.expect(connector.version()).andReturn("1.0");
|
EasyMock.expect(connector.version()).andReturn("1.0");
|
||||||
|
|
||||||
Map<String, String> props = new HashMap<>();
|
Map<String, String> props = new HashMap<>();
|
||||||
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
|
props.put(ConnectorConfig.TOPICS_CONFIG, "foo,bar");
|
||||||
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
|
props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
|
||||||
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
|
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
|
||||||
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
|
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
|
||||||
|
|
||||||
connector.initialize(ctx);
|
connector.initialize(ctx);
|
||||||
EasyMock.expectLastCall();
|
EasyMock.expectLastCall();
|
||||||
|
@ -345,7 +449,8 @@ public class WorkerTest extends ThreadedTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class TestConnector extends Connector {
|
/* Name here needs to be unique as we are testing the aliasing mechanism */
|
||||||
|
private static class WorkerTestConnector extends Connector {
|
||||||
@Override
|
@Override
|
||||||
public String version() {
|
public String version() {
|
||||||
return "1.0";
|
return "1.0";
|
||||||
|
|
|
@ -136,6 +136,7 @@ public class DistributedHerderTest {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
worker = PowerMock.createMock(Worker.class);
|
worker = PowerMock.createMock(Worker.class);
|
||||||
|
EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);
|
||||||
time = new MockTime();
|
time = new MockTime();
|
||||||
|
|
||||||
herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
|
herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
|
||||||
|
@ -487,6 +488,7 @@ public class DistributedHerderTest {
|
||||||
worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject());
|
worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject());
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
|
||||||
|
|
||||||
member.poll(EasyMock.anyInt());
|
member.poll(EasyMock.anyInt());
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
name=local-file-sink
|
name=local-file-sink
|
||||||
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
|
connector.class=FileStreamSink
|
||||||
tasks.max=1
|
tasks.max=1
|
||||||
file={{ OUTPUT_FILE }}
|
file={{ OUTPUT_FILE }}
|
||||||
topics={{ TOPIC }}
|
topics={{ TOPIC }}
|
|
@ -14,7 +14,7 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
name=local-file-source
|
name=local-file-source
|
||||||
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
|
connector.class=FileStreamSource
|
||||||
tasks.max=1
|
tasks.max=1
|
||||||
file={{ INPUT_FILE }}
|
file={{ INPUT_FILE }}
|
||||||
topic={{ TOPIC }}
|
topic={{ TOPIC }}
|
Loading…
Reference in New Issue