Switch to using new consumer coordinator instead of manually assigning partitions. Remove dependency of copycat-runtime on core.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-28 23:42:15 -07:00
parent 4a9b4f3c67
commit dec137998a
20 changed files with 38 additions and 296 deletions

View File

@ -604,8 +604,6 @@ project(':copycat-runtime') {
compile project(':copycat-avro')
compile project(':clients')
compile "org.slf4j:slf4j-api:1.7.6"
// FIXME we shouldn't depend on core since we only need clients, but currently we're doing sink task topic-partition assignment manually until we switch over to new consumer group functionality
compile project(':core')
testCompile 'junit:junit:4.6'
testCompile 'org.easymock:easymock:3.3.1'

View File

@ -135,10 +135,6 @@
<!-- for tests -->
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />
<!-- FIXME Remove when dependency on core for sink partition assignment is removed -->
<allow pkg="kafka.utils" />
<allow pkg="org.I0Itec.zkclient" />
</subpackage>
<subpackage name="cli">
@ -156,11 +152,6 @@
<subpackage name="util">
<allow pkg="org.apache.kafka.copycat" />
<!-- FIXME Remove when dependency on core for sink partition assignment is removed -->
<allow pkg="kafka.utils" />
<allow pkg="org.I0Itec.zkclient" />
<allow pkg="scala" />
</subpackage>
<subpackage name="file">

View File

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
zookeeper.connect=localhost:2181
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081

View File

@ -14,6 +14,5 @@
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
zookeeper.connect=localhost:2181
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081

View File

@ -17,8 +17,6 @@
package org.apache.kafka.copycat.connector;
import java.util.List;
/**
* ConnectorContext allows Connectors to proactively interact with the Copycat runtime.
*/
@ -29,21 +27,4 @@ public interface ConnectorContext {
* added/removed) and the running Tasks will need to be modified.
*/
public void requestTaskReconfiguration();
/**
* Get a list of TopicPartitions for the specified topics. This should be used to determine how
* to divide TopicPartitions between child tasks.
* @param topics list of topics to get partitions for
* @return list of all TopicPartitions for the input topics
*/
public abstract List<TopicPartition> getTopicPartitions(String... topics);
/**
* Get a list of TopicPartitions for the specified topics. This should be used to determine how
* to divide TopicPartitions between child tasks.
* @param topics list of topics to get partitions for
* @return list of all TopicPartitions for the input topics
*/
public abstract List<TopicPartition> getTopicPartitions(List<String> topics);
}

View File

@ -32,16 +32,11 @@ public abstract class SinkTask implements Task {
/**
* <p>
* The configuration key that provides the list of topic partitions that are inputs for this
* The configuration key that provides the list of topics that are inputs for this
* SinkTask.
* </p>
* <p>
* Usually this setting is only used by the Copycat framework since it manages the Kafka
* consumer that provides input records. However, it is provided here for the convenience of
* SinkTask implementations that may also want to know the input set of topic partitions.
* </p>
*/
public static final String TOPICPARTITIONS_CONFIG = "topic.partitions";
public static final String TOPICS_CONFIG = "topics";
protected SinkTaskContext context;

View File

@ -21,7 +21,6 @@ import org.apache.kafka.copycat.connector.ConnectorContext;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.easymock.PowerMock;
@ -84,9 +83,4 @@ public class FileStreamSinkConnectorTest {
PowerMock.verifyAll();
}
private void expectGetTopicPartitionsMultiple() {
EasyMock.expect(ctx.getTopicPartitions(MULTIPLE_TOPICS_LIST[0], MULTIPLE_TOPICS_LIST[1]))
.andReturn(MULTIPLE_TOPICS_PARTITIONS);
}
}

View File

@ -20,7 +20,6 @@ package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.apache.kafka.copycat.sink.SinkTask;
import org.junit.Before;
import org.junit.Test;
@ -45,8 +44,6 @@ public class FileStreamSinkTaskTest {
printStream = new PrintStream(os);
task = new FileStreamSinkTask(printStream);
config = new Properties();
config.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "topic1-1,topic2-2");
}
@Test

View File

@ -74,9 +74,8 @@ public class Copycat {
FutureCallback cb = new FutureCallback(new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
if (error != null) {
if (error != null)
log.error("Failed to stop job {}", connName);
}
}
});
coordinator.deleteConnector(connName, cb);
@ -84,15 +83,13 @@ public class Copycat {
}
// Create any new connectors
for (final String connectorPropsFile : config
.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
connectorProps = Utils.loadProps(connectorPropsFile);
FutureCallback cb = new FutureCallback(new Callback<String>() {
@Override
public void onCompletion(Throwable error, String id) {
if (error != null) {
if (error != null)
log.error("Failed to create job for {}", connectorPropsFile);
}
}
});
coordinator.addConnector(connectorProps, cb);

View File

@ -37,30 +37,6 @@ public class WorkerConfig extends AbstractConfig {
+ "or instances may co-exist while sharing a single Kafka cluster.";
public static final String CLUSTER_DEFAULT = "copycat";
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
private static final String ZOOKEEPER_CONNECT_DOC =
"Specifies the ZooKeeper connection string in the form "
+ "hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting "
+ "through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts "
+ "in the form hostname1:port1,hostname2:port2,hostname3:port3.\n"
+ "\n"
+ "The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts "
+ "its data under some path in the global ZooKeeper namespace. If so the consumer should use the same "
+ "chroot path in its connection string. For example to give a chroot path of /chroot/path you would give "
+ "the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.";
public static final String ZOOKEEPER_CONNECT_DEFAULT = "localhost:2181";
public static final String ZOOKEEPER_SESSION_TIMEOUT_MS_CONFIG = "zookeeper.session.timeout.ms";
private static final String ZOOKEEPER_SESSION_TIMEOUT_MS_DOC
= "Session timeout for ZooKeeper connections.";
public static final String ZOOKEEPER_SESSION_TIMEOUT_MS_DEFAULT = "30000";
public static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS_CONFIG
= "zookeeper.session.connection.ms";
private static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS_DOC
= "Connection timeout for ZooKeeper.";
public static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS_DEFAULT = "30000";
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
public static final String BOOSTRAP_SERVERS_DOC
= "A list of host/port pairs to use for establishing the initial connection to the Kafka "
@ -159,14 +135,6 @@ public class WorkerConfig extends AbstractConfig {
static {
config = new ConfigDef()
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
.define(ZOOKEEPER_CONNECT_CONFIG, Type.STRING, ZOOKEEPER_CONNECT_DEFAULT,
Importance.HIGH, ZOOKEEPER_CONNECT_DOC)
.define(ZOOKEEPER_SESSION_TIMEOUT_MS_CONFIG, Type.INT,
ZOOKEEPER_SESSION_TIMEOUT_MS_DEFAULT,
Importance.LOW, ZOOKEEPER_SESSION_TIMEOUT_MS_DOC)
.define(ZOOKEEPER_CONNECTION_TIMEOUT_MS_CONFIG, Type.INT,
ZOOKEEPER_CONNECTION_TIMEOUT_MS_DEFAULT,
Importance.LOW, ZOOKEEPER_CONNECTION_TIMEOUT_MS_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
.define(CONVERTER_CLASS_CONFIG, Type.CLASS, CONVERTER_CLASS_DEFAULT,

View File

@ -19,8 +19,6 @@ package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.utils.Utils;
@ -62,7 +60,6 @@ public class Worker {
private OffsetDeserializer offsetValueDeserializer;
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
private KafkaProducer producer;
private ZkClient zkClient;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
public Worker(WorkerConfig config) {
@ -70,14 +67,12 @@ public class Worker {
Reflection.instantiateConfigurable(
config.getClass(WorkerConfig.OFFSET_STORAGE_CLASS_CONFIG).getName(),
OffsetBackingStore.class, config.getUnusedProperties()),
null, null, null, null,
createZkClient(config));
null, null, null, null);
}
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
OffsetSerializer offsetKeySerializer, OffsetSerializer offsetValueSerializer,
OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer,
ZkClient zkClient) {
OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer) {
this.time = time;
this.config = config;
this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(),
@ -119,16 +114,6 @@ public class Worker {
OffsetDeserializer.class);
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
}
this.zkClient = zkClient;
}
private static ZkClient createZkClient(WorkerConfig config) {
return new ZkClient(config.getString(WorkerConfig.ZOOKEEPER_CONNECT_CONFIG),
config.getInt(WorkerConfig.ZOOKEEPER_SESSION_TIMEOUT_MS_CONFIG),
config.getInt(WorkerConfig.ZOOKEEPER_CONNECTION_TIMEOUT_MS_CONFIG),
ZKStringSerializer$.MODULE$);
}
public void start() {
@ -245,10 +230,6 @@ public class Worker {
tasks.remove(id);
}
public ZkClient getZkClient() {
return zkClient;
}
private WorkerTask getTask(ConnectorTaskId id) {
WorkerTask task = tasks.get(id);
if (task == null) {

View File

@ -157,6 +157,12 @@ public class WorkerSinkTask implements WorkerTask {
}
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty()) {
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
}
String[] topics = topicsStr.split(",");
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
Properties props = workerConfig.getUnusedProperties();
@ -177,17 +183,21 @@ public class WorkerSinkTask implements WorkerTask {
throw new CopycatRuntimeException("Failed to create consumer", t);
}
List<TopicPartition> topicPartitions = getInputTopicPartitions(taskProps);
log.debug("Task {} subscribing to topic-partitions {}", id, topicPartitions);
log.debug("Task {} subscribing to topics {}", id, topics);
newConsumer.subscribe(topics);
// Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to
// enable exactly once delivery to that system).
//
// To do this correctly, we need to first make sure we have been assigned partitions, which poll() will guarantee.
// We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
newConsumer.poll(0);
Map<TopicPartition, Long> offsets = context.getOffsets();
for (TopicPartition tp : topicPartitions) {
org.apache.kafka.common.TopicPartition kafkatp = new
org.apache.kafka.common.TopicPartition(tp.topic(), tp.partition());
newConsumer.subscribe(kafkatp);
if (offsets.containsKey(tp)) {
long offset = offsets.get(tp);
for (org.apache.kafka.common.TopicPartition kafkatp : newConsumer.subscriptions()) {
TopicPartition tp = new TopicPartition(kafkatp.topic(), kafkatp.partition());
Long offset = offsets.get(tp);
if (offset != null)
newConsumer.seek(kafkatp, offset);
}
}
return newConsumer;
}
@ -196,24 +206,6 @@ public class WorkerSinkTask implements WorkerTask {
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
}
private List<TopicPartition> getInputTopicPartitions(Properties taskProps) {
String topicPartitionsStr = taskProps.getProperty(SinkTask.TOPICPARTITIONS_CONFIG);
if (topicPartitionsStr == null || topicPartitionsStr.isEmpty()) {
throw new CopycatRuntimeException("Sink tasks require a list of topic partitions, which "
+ "copycat should generate automatically. This might "
+ "indicate your Task class inherits from SinkTask, but "
+ "your Connector class does not inherit from "
+ "SinkConnector.");
}
List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
for (String topicPartitionStr : Arrays.asList(topicPartitionsStr.split(","))) {
topicPartitions.add(new TopicPartition(topicPartitionStr));
}
return topicPartitions;
}
private void deliverMessages(ConsumerRecords<Object, Object> msgs) {
// Finally, deliver this batch to the sink
if (msgs.count() > 0) {

View File

@ -17,12 +17,7 @@
package org.apache.kafka.copycat.runtime.standalone;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.copycat.connector.ConnectorContext;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.copycat.util.KafkaUtils;
import java.util.List;
/**
* ConnectorContext for use with the StandaloneCoordinator, which maintains all connectors and tasks
@ -32,13 +27,10 @@ public class StandaloneConnectorContext implements ConnectorContext {
private StandaloneCoordinator coordinator;
private String connectorName;
private ZkClient zkClient;
public StandaloneConnectorContext(StandaloneCoordinator coordinator, String connectorName,
ZkClient zkClient) {
public StandaloneConnectorContext(StandaloneCoordinator coordinator, String connectorName) {
this.coordinator = coordinator;
this.connectorName = connectorName;
this.zkClient = zkClient;
}
@Override
@ -47,14 +39,4 @@ public class StandaloneConnectorContext implements ConnectorContext {
// process
coordinator.requestTaskReconfiguration(connectorName);
}
@Override
public List<TopicPartition> getTopicPartitions(String... topics) {
return KafkaUtils.getTopicPartitions(zkClient, topics);
}
@Override
public List<TopicPartition> getTopicPartitions(List<String> topics) {
return KafkaUtils.getTopicPartitions(zkClient, topics);
}
}

View File

@ -19,7 +19,6 @@ package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.connector.TopicPartition;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
@ -141,7 +140,7 @@ public class StandaloneCoordinator implements Coordinator {
// may be caused by user code
throw new CopycatRuntimeException("Failed to create connector instance", t);
}
connector.initialize(new StandaloneConnectorContext(this, connName, worker.getZkClient()));
connector.initialize(new StandaloneConnectorContext(this, connName));
try {
connector.start(configs);
} catch (CopycatException e) {
@ -194,32 +193,21 @@ public class StandaloneCoordinator implements Coordinator {
log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
int maxTasks = state.maxTasks;
// For sink tasks, we may also be limited by the number of input topic partitions
List<TopicPartition> topicPartitions = null;
if (state.connector instanceof SinkConnector) {
topicPartitions = KafkaUtils.getTopicPartitions(worker.getZkClient(), state.inputTopics);
maxTasks = Math.min(maxTasks, topicPartitions.size());
}
List<Properties> taskConfigs = state.connector.getTaskConfigs(maxTasks);
// If necessary, figure out how to distribute input topic partitions
// TODO: This needs to run periodically so we detect new partitions
List<List<TopicPartition>> taskAssignments = null;
if (state.connector instanceof SinkConnector) {
taskAssignments = ConnectorUtils.groupPartitions(topicPartitions, taskConfigs.size());
}
List<Properties> taskConfigs = state.connector.getTaskConfigs(state.maxTasks);
// Generate the final configs, including framework provided settings
Map<ConnectorTaskId, Properties> taskProps = new HashMap<ConnectorTaskId, Properties>();
for (int i = 0; i < taskConfigs.size(); i++) {
ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
Properties config = taskConfigs.get(i);
// TODO: This probably shouldn't be in the Coordinator. It's nice to have Copycat ensure the list of topics
// is automatically provided to tasks since it is required by the framework, but this
String subscriptionTopics = Utils.join(state.inputTopics, ",");
if (state.connector instanceof SinkConnector) {
// Make sure we don't modify the original since the connector may reuse it internally
Properties configForSink = new Properties();
configForSink.putAll(config);
configForSink.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, Utils.join(taskAssignments.get(i), ","));
configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
config = configForSink;
}
taskProps.put(taskId, config);

View File

@ -1,53 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.util;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.copycat.connector.TopicPartition;
import scala.collection.Seq;
import scala.collection.mutable.Map;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static scala.collection.JavaConversions.*;
/**
* Some utilities for working with Kafka that aren't included with Kafka itself.
*/
public class KafkaUtils {
public static List<TopicPartition> getTopicPartitions(ZkClient zkClient, String... topics) {
return getTopicPartitions(zkClient, Arrays.asList(topics));
}
public static List<TopicPartition> getTopicPartitions(ZkClient zkClient, List<String> topics) {
Seq<String> scalaTopics = asScalaIterable(topics).toSeq();
List<TopicPartition> result = new ArrayList<TopicPartition>();
Map<String, Seq<Object>> partitionsByTopic
= ZkUtils.getPartitionsForTopics(zkClient, scalaTopics);
for (java.util.Map.Entry<String, Seq<Object>> entry : asJavaMap(partitionsByTopic).entrySet()) {
for (Object partition : asJavaIterable(entry.getValue())) {
result.add(new TopicPartition(entry.getKey(), (Integer) partition));
}
}
return result;
}
}

View File

@ -25,7 +25,6 @@ import org.apache.kafka.copycat.data.GenericRecord;
import org.apache.kafka.copycat.data.GenericRecordBuilder;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.sink.SinkTaskContext;
@ -88,21 +87,9 @@ public class WorkerSinkTaskTest extends ThreadedTest {
recordsReturned = 0;
}
@Test
public void testGetInputTopicPartitions() throws Exception {
Properties props = new Properties();
props.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "topic-1,foo-2");
assertEquals(
Arrays.asList(new org.apache.kafka.copycat.connector.TopicPartition("topic", 1),
new org.apache.kafka.copycat.connector.TopicPartition("foo", 2)),
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", props)
);
}
@Test
public void testPollsInBackground() throws Exception {
Properties taskProps = new Properties();
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
expectInitializeTask(taskProps);
Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
@ -170,7 +157,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
@Test
public void testCommit() throws Exception {
Properties taskProps = new Properties();
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
expectInitializeTask(taskProps);
// Make each poll() take the offset commit interval
@ -199,7 +185,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
@Test
public void testCommitTaskFlushFailure() throws Exception {
Properties taskProps = new Properties();
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
expectInitializeTask(taskProps);
Capture<Collection<SinkRecord>> capturedRecords
@ -224,7 +209,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
@Test
public void testCommitConsumerFailure() throws Exception {
Properties taskProps = new Properties();
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
expectInitializeTask(taskProps);
Capture<Collection<SinkRecord>> capturedRecords
@ -250,7 +234,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
@Test
public void testCommitTimeout() throws Exception {
Properties taskProps = new Properties();
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
expectInitializeTask(taskProps);
// Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
@ -277,34 +260,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.verifyAll();
}
@Test
public void testGetInputPartitionsSingle() throws Exception {
Properties taskProps = new Properties();
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "test-1");
assertEquals(Arrays.asList(new org.apache.kafka.copycat.connector.TopicPartition("test", 1)),
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", taskProps));
}
@Test
public void testGetInputPartitionsList() throws Exception {
Properties taskProps = new Properties();
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "test-1,foo-2,bar-3");
assertEquals(Arrays.asList(
new org.apache.kafka.copycat.connector.TopicPartition("test", 1),
new org.apache.kafka.copycat.connector.TopicPartition("foo", 2),
new org.apache.kafka.copycat.connector.TopicPartition("bar", 3)),
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", taskProps));
}
@Test(expected = CopycatRuntimeException.class)
public void testGetInputPartitionsMissing() throws Exception {
// Missing setting
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", new Properties());
}
private KafkaConsumer<Object, Object> expectInitializeTask(Properties taskProps)
throws Exception {
sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));

View File

@ -18,7 +18,6 @@
package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.utils.Time;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.copycat.cli.WorkerConfig;
import org.apache.kafka.copycat.errors.CopycatException;
@ -62,11 +61,9 @@ public class WorkerTest extends ThreadedTest {
Properties workerProps = new Properties();
workerProps.setProperty("schema.registry.url", "http://localhost:8081");
WorkerConfig config = new WorkerConfig(workerProps);
ZkClient zkClient = PowerMock.createMock(ZkClient.class);
worker = new Worker(new MockTime(), config, offsetBackingStore,
offsetKeySerializer, offsetValueSerializer,
offsetKeyDeserializer, offsetValueDeserializer,
zkClient);
offsetKeyDeserializer, offsetValueDeserializer);
worker.start();
}

View File

@ -21,7 +21,6 @@ import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.KafkaUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -35,7 +34,7 @@ import java.io.File;
import java.util.Properties;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StandaloneCoordinator.class, KafkaUtils.class})
@PrepareForTest({StandaloneCoordinator.class})
@PowerMockIgnore("javax.management.*")
public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestBase {
private File coordinatorConfigFile;
@ -57,7 +56,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
PowerMock.mockStatic(StandaloneCoordinator.class);
PowerMock.mockStatic(KafkaUtils.class);
// These can be anything since connectors can pass along whatever they want.
taskProps = new Properties();

View File

@ -22,7 +22,6 @@ import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.FutureCallback;
import org.apache.kafka.copycat.util.KafkaUtils;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -35,7 +34,7 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
@RunWith(PowerMockRunner.class)
@PrepareForTest({StandaloneCoordinator.class, KafkaUtils.class})
@PrepareForTest({StandaloneCoordinator.class})
@PowerMockIgnore("javax.management.*")
public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
@ -49,7 +48,6 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
PowerMock.mockStatic(StandaloneCoordinator.class);
PowerMock.mockStatic(KafkaUtils.class);
// These can be anything since connectors can pass along whatever they want.
taskProps = new Properties();

View File

@ -17,7 +17,6 @@
package org.apache.kafka.copycat.runtime.standalone;
import org.I0Itec.zkclient.ZkClient;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.connector.TopicPartition;
@ -30,22 +29,16 @@ import org.apache.kafka.copycat.source.SourceConnector;
import org.apache.kafka.copycat.source.SourceTask;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.ConnectorTaskId;
import org.apache.kafka.copycat.util.KafkaUtils;
import org.easymock.EasyMock;
import org.powermock.api.easymock.PowerMock;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class StandaloneCoordinatorTestBase {
protected static final String CONNECTOR_NAME = "test";
protected static final String TOPICS_LIST_STR = "topic1,topic2";
protected static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
protected static final List<TopicPartition> TOPIC_PARTITIONS = Arrays.asList(
new TopicPartition("topic1", 1), new TopicPartition("topic2", 1));
protected static final String TOPIC_PARTITIONS_STR = "topic1-1,topic2-1";
protected StandaloneCoordinator coordinator;
protected Worker worker;
@ -79,8 +72,6 @@ public class StandaloneCoordinatorTestBase {
PowerMock.expectLastCall();
}
ZkClient zkClient = PowerMock.createMock(ZkClient.class);
EasyMock.expect(worker.getZkClient()).andStubReturn(zkClient);
connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
PowerMock.expectLastCall();
connector.start(new Properties());
@ -89,20 +80,14 @@ public class StandaloneCoordinatorTestBase {
// Just return the connector properties for the individual task we generate by default
EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass);
if (sink) {
EasyMock.expect(KafkaUtils.getTopicPartitions(zkClient, TOPICS_LIST))
.andReturn(TOPIC_PARTITIONS);
}
EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
.andReturn(Arrays.asList(taskProps));
// And we should instantiate the tasks. For a sink task, we should see added properties for
// the input topic partitions
Properties generatedTaskProps = new Properties();
generatedTaskProps.putAll(taskProps);
if (sink) {
generatedTaskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITIONS_STR);
}
if (sink)
generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps);
PowerMock.expectLastCall();
}