KAFKA-3073: Add topic regex support for Connect sinks

There are more methods that had to be touched than I anticipated when writing [the KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-215%3A+Add+topic+regex+support+for+Connect+sinks).

The implementation here is now complete and includes a test that verifies that there's a call to `consumer.subscribe(Pattern, RebalanceHandler)` when `topics.regex` is provided.

Author: Jeff Klukas <jeff@klukas.net>

Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #4151 from jklukas/connect-topics.regex
This commit is contained in:
Jeff Klukas 2017-11-21 16:01:16 -08:00 committed by Ewen Cheslack-Postava
parent fd8eb268d6
commit 049342e440
11 changed files with 134 additions and 50 deletions

View File

@ -61,6 +61,14 @@ public abstract class SinkTask implements Task {
*/ */
public static final String TOPICS_CONFIG = "topics"; public static final String TOPICS_CONFIG = "topics";
/**
* <p>
* The configuration key that provides a regex specifying which topics to include as inputs
* for this SinkTask.
* </p>
*/
public static final String TOPICS_REGEX_CONFIG = "topics.regex";
protected SinkTaskContext context; protected SinkTaskContext context;
/** /**

View File

@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import java.util.Map; import java.util.Map;
@ -27,13 +29,21 @@ import java.util.Map;
public class SinkConnectorConfig extends ConnectorConfig { public class SinkConnectorConfig extends ConnectorConfig {
public static final String TOPICS_CONFIG = "topics"; public static final String TOPICS_CONFIG = SinkTask.TOPICS_CONFIG;
private static final String TOPICS_DOC = ""; private static final String TOPICS_DOC = "List of topics to consume, separated by commas";
public static final String TOPICS_DEFAULT = ""; public static final String TOPICS_DEFAULT = "";
private static final String TOPICS_DISPLAY = "Topics"; private static final String TOPICS_DISPLAY = "Topics";
private static final String TOPICS_REGEX_CONFIG = SinkTask.TOPICS_REGEX_CONFIG;
private static final String TOPICS_REGEX_DOC = "Regular expression giving topics to consume. " +
"Under the hood, the regex is compiled to a <code>java.util.regex.Pattern</code>. " +
"Only one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG + " should be specified.";
public static final String TOPICS_REGEX_DEFAULT = "";
private static final String TOPICS_REGEX_DISPLAY = "Topics regex";
static ConfigDef config = ConnectorConfig.configDef() static ConfigDef config = ConnectorConfig.configDef()
.define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY); .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY)
.define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY);
public static ConfigDef configDef() { public static ConfigDef configDef() {
return config; return config;

View File

@ -254,17 +254,18 @@ public class Worker {
* Get a list of updated task properties for the tasks of this connector. * Get a list of updated task properties for the tasks of this connector.
* *
* @param connName the connector name. * @param connName the connector name.
* @param maxTasks the maxinum number of tasks.
* @param sinkTopics a list of sink topics.
* @return a list of updated tasks properties. * @return a list of updated tasks properties.
*/ */
public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) { public List<Map<String, String>> connectorTaskConfigs(String connName, ConnectorConfig connConfig) {
log.trace("Reconfiguring connector tasks for {}", connName); log.trace("Reconfiguring connector tasks for {}", connName);
WorkerConnector workerConnector = connectors.get(connName); WorkerConnector workerConnector = connectors.get(connName);
if (workerConnector == null) if (workerConnector == null)
throw new ConnectException("Connector " + connName + " not found in this worker."); throw new ConnectException("Connector " + connName + " not found in this worker.");
int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
Map<String, String> connOriginals = connConfig.originalsStrings();
Connector connector = workerConnector.connector(); Connector connector = workerConnector.connector();
List<Map<String, String>> result = new ArrayList<>(); List<Map<String, String>> result = new ArrayList<>();
ClassLoader savedLoader = plugins.currentThreadLoader(); ClassLoader savedLoader = plugins.currentThreadLoader();
@ -275,8 +276,11 @@ public class Worker {
// Ensure we don't modify the connector's copy of the config // Ensure we don't modify the connector's copy of the config
Map<String, String> taskConfig = new HashMap<>(taskProps); Map<String, String> taskConfig = new HashMap<>(taskProps);
taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName); taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
if (sinkTopics != null) { if (connOriginals.containsKey(SinkTask.TOPICS_CONFIG)) {
taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ",")); taskConfig.put(SinkTask.TOPICS_CONFIG, connOriginals.get(SinkTask.TOPICS_CONFIG));
}
if (connOriginals.containsKey(SinkTask.TOPICS_REGEX_CONFIG)) {
taskConfig.put(SinkTask.TOPICS_REGEX_CONFIG, connOriginals.get(SinkTask.TOPICS_REGEX_CONFIG));
} }
result.add(taskConfig); result.add(taskConfig);
} }

View File

@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Avg;
@ -53,6 +54,7 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Pattern;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
@ -258,11 +260,31 @@ class WorkerSinkTask extends WorkerTask {
*/ */
protected void initializeAndStart() { protected void initializeAndStart() {
String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG); String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty()) boolean topicsStrPresent = topicsStr != null && !topicsStr.trim().isEmpty();
throw new ConnectException("Sink tasks require a list of topics.");
String[] topics = topicsStr.split(","); String topicsRegexStr = taskConfig.get(SinkTask.TOPICS_REGEX_CONFIG);
consumer.subscribe(Arrays.asList(topics), new HandleRebalance()); boolean topicsRegexStrPresent = topicsRegexStr != null && !topicsRegexStr.trim().isEmpty();
log.debug("{} Initializing and starting task for topics {}", this, topics);
if (topicsStrPresent && topicsRegexStrPresent) {
throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG +
" are mutually exclusive options, but both are set.");
}
if (!topicsStrPresent && !topicsRegexStrPresent) {
throw new ConfigException("Must configure one of " +
SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG);
}
if (topicsStrPresent) {
String[] topics = topicsStr.split(",");
consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
log.debug("{} Initializing and starting task for topics {}", this, topics);
} else {
Pattern pattern = Pattern.compile(topicsRegexStr);
consumer.subscribe(pattern, new HandleRebalance());
log.debug("{} Initializing and starting task for topics regex {}", this, topicsRegexStr);
}
task.initialize(context); task.initialize(context);
task.start(taskConfig); task.start(taskConfig);
log.info("{} Sink task finished initialization and start", this); log.info("{} Sink task finished initialization and start", this);

View File

@ -974,16 +974,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
Map<String, String> configs = configState.connectorConfig(connName); Map<String, String> configs = configState.connectorConfig(connName);
ConnectorConfig connConfig; ConnectorConfig connConfig;
List<String> sinkTopics = null;
if (worker.isSinkConnector(connName)) { if (worker.isSinkConnector(connName)) {
connConfig = new SinkConnectorConfig(plugins(), configs); connConfig = new SinkConnectorConfig(plugins(), configs);
sinkTopics = connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG);
} else { } else {
connConfig = new SourceConnectorConfig(plugins(), configs); connConfig = new SourceConnectorConfig(plugins(), configs);
} }
final List<Map<String, String>> taskProps final List<Map<String, String>> taskProps = worker.connectorTaskConfigs(connName, connConfig);
= worker.connectorTaskConfigs(connName, connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG), sinkTopics);
boolean changed = false; boolean changed = false;
int currentNumTasks = configState.taskCount(connName); int currentNumTasks = configState.taskCount(connName);
if (taskProps.size() != currentNumTasks) { if (taskProps.size() != currentNumTasks) {

View File

@ -255,19 +255,11 @@ public class StandaloneHerder extends AbstractHerder {
private List<Map<String, String>> recomputeTaskConfigs(String connName) { private List<Map<String, String>> recomputeTaskConfigs(String connName) {
Map<String, String> config = configState.connectorConfig(connName); Map<String, String> config = configState.connectorConfig(connName);
ConnectorConfig connConfig; ConnectorConfig connConfig = worker.isSinkConnector(connName) ?
if (worker.isSinkConnector(connName)) { new SinkConnectorConfig(plugins(), config) :
connConfig = new SinkConnectorConfig(plugins(), config); new SourceConnectorConfig(plugins(), config);
return worker.connectorTaskConfigs(connName,
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
connConfig.getList(SinkConnectorConfig.TOPICS_CONFIG));
} else {
connConfig = new SourceConnectorConfig(plugins(), config);
return worker.connectorTaskConfigs(connName,
connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG),
null);
}
return worker.connectorTaskConfigs(connName, connConfig);
} }
private void createConnectorTasks(String connName, TargetState initialState) { private void createConnectorTasks(String connName, TargetState initialState) {

View File

@ -67,6 +67,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import static java.util.Collections.singleton; import static java.util.Collections.singleton;
@ -127,6 +128,7 @@ public class WorkerSinkTaskTest {
@Mock @Mock
private KafkaConsumer<byte[], byte[]> consumer; private KafkaConsumer<byte[], byte[]> consumer;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture(); private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
private Capture<Pattern> topicsRegex = EasyMock.newCapture();
private long recordsReturnedTp1; private long recordsReturnedTp1;
private long recordsReturnedTp3; private long recordsReturnedTp3;
@ -1143,6 +1145,41 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
@Test
public void testTopicsRegex() throws Exception {
Map<String, String> props = new HashMap<>(TASK_PROPS);
props.remove("topics");
props.put("topics.regex", "te.*");
TaskConfig taskConfig = new TaskConfig(props);
createTask(TargetState.PAUSED);
PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
consumer.subscribe(EasyMock.capture(topicsRegex), EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();
sinkTask.initialize(EasyMock.capture(sinkTaskContext));
PowerMock.expectLastCall();
sinkTask.start(props);
PowerMock.expectLastCall();
expectPollInitialAssignment();
Set<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
EasyMock.expect(consumer.assignment()).andReturn(partitions);
consumer.pause(partitions);
PowerMock.expectLastCall();
PowerMock.replayAll();
workerTask.initialize(taskConfig);
workerTask.initializeAndStart();
workerTask.iteration();
time.sleep(10000L);
PowerMock.verifyAll();
}
private void expectInitializeTask() throws Exception { private void expectInitializeTask() throws Exception {
PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer); PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener)); consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));

View File

@ -443,7 +443,10 @@ public class WorkerTest extends ThreadedTest {
} catch (ConnectException e) { } catch (ConnectException e) {
// expected // expected
} }
List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, 2, Arrays.asList("foo", "bar")); Map<String, String> connProps = new HashMap<>(props);
connProps.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
ConnectorConfig connConfig = new SinkConnectorConfig(plugins, connProps);
List<Map<String, String>> taskConfigs = worker.connectorTaskConfigs(CONNECTOR_ID, connConfig);
Map<String, String> expectedTaskProps = new HashMap<>(); Map<String, String> expectedTaskProps = new HashMap<>();
expectedTaskProps.put("foo", "bar"); expectedTaskProps.put("foo", "bar");
expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName()); expectedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());

View File

@ -168,12 +168,15 @@ public class DistributedHerderTest {
private ConfigBackingStore.UpdateListener configUpdateListener; private ConfigBackingStore.UpdateListener configUpdateListener;
private WorkerRebalanceListener rebalanceListener; private WorkerRebalanceListener rebalanceListener;
private SinkConnectorConfig conn1SinkConfig;
private SinkConnectorConfig conn1SinkConfigUpdated;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
time = new MockTime(); time = new MockTime();
metrics = new MockConnectMetrics(time); metrics = new MockConnectMetrics(time);
worker = PowerMock.createMock(Worker.class); worker = PowerMock.createMock(Worker.class);
EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE); EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"}, herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time); new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
@ -181,6 +184,8 @@ public class DistributedHerderTest {
configUpdateListener = herder.new ConfigUpdateListener(); configUpdateListener = herder.new ConfigUpdateListener();
rebalanceListener = herder.new RebalanceListener(); rebalanceListener = herder.new RebalanceListener();
plugins = PowerMock.createMock(Plugins.class); plugins = PowerMock.createMock(Plugins.class);
conn1SinkConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
conn1SinkConfigUpdated = new SinkConnectorConfig(plugins, CONN1_CONFIG_UPDATED);
EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName())).andReturn(ConnectorType.SOURCE).anyTimes(); EasyMock.expect(herder.connectorTypeForClass(BogusSourceConnector.class.getName())).andReturn(ConnectorType.SOURCE).anyTimes();
pluginLoader = PowerMock.createMock(PluginClassLoader.class); pluginLoader = PowerMock.createMock(PluginClassLoader.class);
delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class); delegatingLoader = PowerMock.createMock(DelegatingClassLoader.class);
@ -205,7 +210,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
@ -232,7 +237,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
@ -248,7 +253,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall(); PowerMock.expectLastCall();
@ -279,7 +284,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
@ -558,7 +563,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
// And delete the connector // And delete the connector
member.wakeup(); member.wakeup();
@ -584,7 +589,7 @@ public class DistributedHerderTest {
@Test @Test
public void testRestartConnector() throws Exception { public void testRestartConnector() throws Exception {
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andStubReturn(TASK_CONFIGS);
// get the initial assignment // get the initial assignment
EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(member.memberId()).andStubReturn("leader");
@ -740,7 +745,7 @@ public class DistributedHerderTest {
@Test @Test
public void testRestartTask() throws Exception { public void testRestartTask() throws Exception {
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andStubReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andStubReturn(TASK_CONFIGS);
// get the initial assignment // get the initial assignment
EasyMock.expect(member.memberId()).andStubReturn("leader"); EasyMock.expect(member.memberId()).andStubReturn("leader");
@ -921,7 +926,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall(); PowerMock.expectLastCall();
@ -950,7 +955,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall(); PowerMock.expectLastCall();
@ -966,7 +971,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall(); PowerMock.expectLastCall();
@ -994,7 +999,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall(); PowerMock.expectLastCall();
@ -1047,7 +1052,7 @@ public class DistributedHerderTest {
// we expect reconfiguration after resuming // we expect reconfiguration after resuming
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.setTargetState(CONN1, TargetState.STARTED); worker.setTargetState(CONN1, TargetState.STARTED);
PowerMock.expectLastCall(); PowerMock.expectLastCall();
@ -1241,7 +1246,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.getPlugins()).andReturn(plugins); EasyMock.expect(worker.getPlugins()).andReturn(plugins);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(), worker.startTask(EasyMock.eq(TASK1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
@ -1317,7 +1322,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
// list connectors, get connector info, get connector config, get task configs // list connectors, get connector info, get connector config, get task configs
member.wakeup(); member.wakeup();
@ -1356,7 +1361,7 @@ public class DistributedHerderTest {
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true); PowerMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true); EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS); EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfigUpdated)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt()); member.poll(EasyMock.anyInt());
PowerMock.expectLastCall(); PowerMock.expectLastCall();

View File

@ -30,6 +30,8 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus; import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext; import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig; import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus; import org.apache.kafka.connect.runtime.TaskStatus;
@ -479,7 +481,7 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall().andReturn(true); EasyMock.expectLastCall().andReturn(true);
EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true); EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
// Generate same task config, which should result in no additional action to restart tasks // Generate same task config, which should result in no additional action to restart tasks
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null)) EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig)))
.andReturn(singletonList(taskConfig(SourceSink.SOURCE))); .andReturn(singletonList(taskConfig(SourceSink.SOURCE)));
worker.isSinkConnector(CONNECTOR_NAME); worker.isSinkConnector(CONNECTOR_NAME);
EasyMock.expectLastCall().andReturn(false); EasyMock.expectLastCall().andReturn(false);
@ -562,6 +564,9 @@ public class StandaloneHerderTest {
private void expectAdd(SourceSink sourceSink) throws Exception { private void expectAdd(SourceSink sourceSink) throws Exception {
Map<String, String> connectorProps = connectorConfig(sourceSink); Map<String, String> connectorProps = connectorConfig(sourceSink);
ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
new SourceConnectorConfig(plugins, connectorProps) :
new SinkConnectorConfig(plugins, connectorProps);
worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class), worker.startConnector(EasyMock.eq(CONNECTOR_NAME), EasyMock.eq(connectorProps), EasyMock.anyObject(HerderConnectorContext.class),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED)); EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
@ -577,7 +582,7 @@ public class StandaloneHerderTest {
Map<String, String> generatedTaskProps = taskConfig(sourceSink); Map<String, String> generatedTaskProps = taskConfig(sourceSink);
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null)) EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
.andReturn(singletonList(generatedTaskProps)); .andReturn(singletonList(generatedTaskProps));
worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED); worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), connectorConfig(sourceSink), generatedTaskProps, herder, TargetState.STARTED);

View File

@ -95,9 +95,10 @@
<p>The <code>connector.class</code> config supports several formats: the full name or alias of the class for this connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name or use FileStreamSink or FileStreamSinkConnector to make the configuration a bit shorter.</p> <p>The <code>connector.class</code> config supports several formats: the full name or alias of the class for this connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name or use FileStreamSink or FileStreamSinkConnector to make the configuration a bit shorter.</p>
<p>Sink connectors also have one additional option to control their input:</p> <p>Sink connectors also have a few additional options to control their input. Each sink connector must set one of the following:</p>
<ul> <ul>
<li><code>topics</code> - A list of topics to use as input for this connector</li> <li><code>topics</code> - A comma-separated list of topics to use as input for this connector</li>
<li><code>topics.regex</code> - A Java regular expression of topics to use as input for this connector</li>
</ul> </ul>
<p>For any other options, you should consult the documentation for the connector.</p> <p>For any other options, you should consult the documentation for the connector.</p>