mirror of https://github.com/apache/kafka.git
Switch to using new consumer coordinator instead of manually assigning partitions. Remove dependency of copycat-runtime on core.
This commit is contained in:
parent
4a9b4f3c67
commit
dec137998a
|
@ -604,8 +604,6 @@ project(':copycat-runtime') {
|
|||
compile project(':copycat-avro')
|
||||
compile project(':clients')
|
||||
compile "org.slf4j:slf4j-api:1.7.6"
|
||||
// FIXME we shouldn't depend on core since we only need clients, but currently we're doing sink task topic-partition assignment manually until we switch over to new consumer group functionality
|
||||
compile project(':core')
|
||||
|
||||
testCompile 'junit:junit:4.6'
|
||||
testCompile 'org.easymock:easymock:3.3.1'
|
||||
|
|
|
@ -135,10 +135,6 @@
|
|||
<!-- for tests -->
|
||||
<allow pkg="org.easymock" />
|
||||
<allow pkg="org.powermock" />
|
||||
|
||||
<!-- FIXME Remove when dependency on core for sink partition assignment is removed -->
|
||||
<allow pkg="kafka.utils" />
|
||||
<allow pkg="org.I0Itec.zkclient" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="cli">
|
||||
|
@ -156,11 +152,6 @@
|
|||
|
||||
<subpackage name="util">
|
||||
<allow pkg="org.apache.kafka.copycat" />
|
||||
|
||||
<!-- FIXME Remove when dependency on core for sink partition assignment is removed -->
|
||||
<allow pkg="kafka.utils" />
|
||||
<allow pkg="org.I0Itec.zkclient" />
|
||||
<allow pkg="scala" />
|
||||
</subpackage>
|
||||
|
||||
<subpackage name="file">
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
zookeeper.connect=localhost:2181
|
||||
bootstrap.servers=localhost:9092
|
||||
schema.registry.url=http://localhost:8081
|
||||
|
||||
|
|
|
@ -14,6 +14,5 @@
|
|||
# limitations under the License.
|
||||
|
||||
# These are defaults. This file just demonstrates how to override some settings.
|
||||
zookeeper.connect=localhost:2181
|
||||
bootstrap.servers=localhost:9092
|
||||
schema.registry.url=http://localhost:8081
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.connector;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* ConnectorContext allows Connectors to proactively interact with the Copycat runtime.
|
||||
*/
|
||||
|
@ -29,21 +27,4 @@ public interface ConnectorContext {
|
|||
* added/removed) and the running Tasks will need to be modified.
|
||||
*/
|
||||
public void requestTaskReconfiguration();
|
||||
|
||||
/**
|
||||
* Get a list of TopicPartitions for the specified topics. This should be used to determine how
|
||||
* to divide TopicPartitions between child tasks.
|
||||
* @param topics list of topics to get partitions for
|
||||
* @return list of all TopicPartitions for the input topics
|
||||
*/
|
||||
public abstract List<TopicPartition> getTopicPartitions(String... topics);
|
||||
|
||||
/**
|
||||
* Get a list of TopicPartitions for the specified topics. This should be used to determine how
|
||||
* to divide TopicPartitions between child tasks.
|
||||
* @param topics list of topics to get partitions for
|
||||
* @return list of all TopicPartitions for the input topics
|
||||
*/
|
||||
public abstract List<TopicPartition> getTopicPartitions(List<String> topics);
|
||||
|
||||
}
|
||||
|
|
|
@ -32,16 +32,11 @@ public abstract class SinkTask implements Task {
|
|||
|
||||
/**
|
||||
* <p>
|
||||
* The configuration key that provides the list of topic partitions that are inputs for this
|
||||
* The configuration key that provides the list of topics that are inputs for this
|
||||
* SinkTask.
|
||||
* </p>
|
||||
* <p>
|
||||
* Usually this setting is only used by the Copycat framework since it manages the Kafka
|
||||
* consumer that provides input records. However, it is provided here for the convenience of
|
||||
* SinkTask implementations that may also want to know the input set of topic partitions.
|
||||
* </p>
|
||||
*/
|
||||
public static final String TOPICPARTITIONS_CONFIG = "topic.partitions";
|
||||
public static final String TOPICS_CONFIG = "topics";
|
||||
|
||||
protected SinkTaskContext context;
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.copycat.connector.ConnectorContext;
|
|||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
import org.easymock.EasyMock;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
|
@ -84,9 +83,4 @@ public class FileStreamSinkConnectorTest {
|
|||
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
private void expectGetTopicPartitionsMultiple() {
|
||||
EasyMock.expect(ctx.getTopicPartitions(MULTIPLE_TOPICS_LIST[0], MULTIPLE_TOPICS_LIST[1]))
|
||||
.andReturn(MULTIPLE_TOPICS_PARTITIONS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.kafka.copycat.file;
|
|||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||
import org.apache.kafka.copycat.sink.SinkTask;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -45,8 +44,6 @@ public class FileStreamSinkTaskTest {
|
|||
printStream = new PrintStream(os);
|
||||
task = new FileStreamSinkTask(printStream);
|
||||
config = new Properties();
|
||||
|
||||
config.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "topic1-1,topic2-2");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -74,9 +74,8 @@ public class Copycat {
|
|||
FutureCallback cb = new FutureCallback(new Callback<Void>() {
|
||||
@Override
|
||||
public void onCompletion(Throwable error, Void result) {
|
||||
if (error != null) {
|
||||
if (error != null)
|
||||
log.error("Failed to stop job {}", connName);
|
||||
}
|
||||
}
|
||||
});
|
||||
coordinator.deleteConnector(connName, cb);
|
||||
|
@ -84,15 +83,13 @@ public class Copycat {
|
|||
}
|
||||
|
||||
// Create any new connectors
|
||||
for (final String connectorPropsFile : config
|
||||
.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
|
||||
for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
|
||||
connectorProps = Utils.loadProps(connectorPropsFile);
|
||||
FutureCallback cb = new FutureCallback(new Callback<String>() {
|
||||
@Override
|
||||
public void onCompletion(Throwable error, String id) {
|
||||
if (error != null) {
|
||||
if (error != null)
|
||||
log.error("Failed to create job for {}", connectorPropsFile);
|
||||
}
|
||||
}
|
||||
});
|
||||
coordinator.addConnector(connectorProps, cb);
|
||||
|
|
|
@ -37,30 +37,6 @@ public class WorkerConfig extends AbstractConfig {
|
|||
+ "or instances may co-exist while sharing a single Kafka cluster.";
|
||||
public static final String CLUSTER_DEFAULT = "copycat";
|
||||
|
||||
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
|
||||
private static final String ZOOKEEPER_CONNECT_DOC =
|
||||
"Specifies the ZooKeeper connection string in the form "
|
||||
+ "hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting "
|
||||
+ "through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts "
|
||||
+ "in the form hostname1:port1,hostname2:port2,hostname3:port3.\n"
|
||||
+ "\n"
|
||||
+ "The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts "
|
||||
+ "its data under some path in the global ZooKeeper namespace. If so the consumer should use the same "
|
||||
+ "chroot path in its connection string. For example to give a chroot path of /chroot/path you would give "
|
||||
+ "the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.";
|
||||
public static final String ZOOKEEPER_CONNECT_DEFAULT = "localhost:2181";
|
||||
|
||||
public static final String ZOOKEEPER_SESSION_TIMEOUT_MS_CONFIG = "zookeeper.session.timeout.ms";
|
||||
private static final String ZOOKEEPER_SESSION_TIMEOUT_MS_DOC
|
||||
= "Session timeout for ZooKeeper connections.";
|
||||
public static final String ZOOKEEPER_SESSION_TIMEOUT_MS_DEFAULT = "30000";
|
||||
|
||||
public static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS_CONFIG
|
||||
= "zookeeper.session.connection.ms";
|
||||
private static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS_DOC
|
||||
= "Connection timeout for ZooKeeper.";
|
||||
public static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS_DEFAULT = "30000";
|
||||
|
||||
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
|
||||
public static final String BOOSTRAP_SERVERS_DOC
|
||||
= "A list of host/port pairs to use for establishing the initial connection to the Kafka "
|
||||
|
@ -159,14 +135,6 @@ public class WorkerConfig extends AbstractConfig {
|
|||
static {
|
||||
config = new ConfigDef()
|
||||
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
|
||||
.define(ZOOKEEPER_CONNECT_CONFIG, Type.STRING, ZOOKEEPER_CONNECT_DEFAULT,
|
||||
Importance.HIGH, ZOOKEEPER_CONNECT_DOC)
|
||||
.define(ZOOKEEPER_SESSION_TIMEOUT_MS_CONFIG, Type.INT,
|
||||
ZOOKEEPER_SESSION_TIMEOUT_MS_DEFAULT,
|
||||
Importance.LOW, ZOOKEEPER_SESSION_TIMEOUT_MS_DOC)
|
||||
.define(ZOOKEEPER_CONNECTION_TIMEOUT_MS_CONFIG, Type.INT,
|
||||
ZOOKEEPER_CONNECTION_TIMEOUT_MS_DEFAULT,
|
||||
Importance.LOW, ZOOKEEPER_CONNECTION_TIMEOUT_MS_DOC)
|
||||
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
|
||||
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
|
||||
.define(CONVERTER_CLASS_CONFIG, Type.CLASS, CONVERTER_CLASS_DEFAULT,
|
||||
|
|
|
@ -19,8 +19,6 @@ package org.apache.kafka.copycat.runtime;
|
|||
|
||||
import org.apache.kafka.common.utils.SystemTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import kafka.utils.ZKStringSerializer$;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
|
@ -62,7 +60,6 @@ public class Worker {
|
|||
private OffsetDeserializer offsetValueDeserializer;
|
||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
|
||||
private KafkaProducer producer;
|
||||
private ZkClient zkClient;
|
||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||
|
||||
public Worker(WorkerConfig config) {
|
||||
|
@ -70,14 +67,12 @@ public class Worker {
|
|||
Reflection.instantiateConfigurable(
|
||||
config.getClass(WorkerConfig.OFFSET_STORAGE_CLASS_CONFIG).getName(),
|
||||
OffsetBackingStore.class, config.getUnusedProperties()),
|
||||
null, null, null, null,
|
||||
createZkClient(config));
|
||||
null, null, null, null);
|
||||
}
|
||||
|
||||
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
|
||||
OffsetSerializer offsetKeySerializer, OffsetSerializer offsetValueSerializer,
|
||||
OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer,
|
||||
ZkClient zkClient) {
|
||||
OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer) {
|
||||
this.time = time;
|
||||
this.config = config;
|
||||
this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(),
|
||||
|
@ -119,16 +114,6 @@ public class Worker {
|
|||
OffsetDeserializer.class);
|
||||
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
|
||||
}
|
||||
|
||||
this.zkClient = zkClient;
|
||||
|
||||
}
|
||||
|
||||
private static ZkClient createZkClient(WorkerConfig config) {
|
||||
return new ZkClient(config.getString(WorkerConfig.ZOOKEEPER_CONNECT_CONFIG),
|
||||
config.getInt(WorkerConfig.ZOOKEEPER_SESSION_TIMEOUT_MS_CONFIG),
|
||||
config.getInt(WorkerConfig.ZOOKEEPER_CONNECTION_TIMEOUT_MS_CONFIG),
|
||||
ZKStringSerializer$.MODULE$);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
|
@ -245,10 +230,6 @@ public class Worker {
|
|||
tasks.remove(id);
|
||||
}
|
||||
|
||||
public ZkClient getZkClient() {
|
||||
return zkClient;
|
||||
}
|
||||
|
||||
private WorkerTask getTask(ConnectorTaskId id) {
|
||||
WorkerTask task = tasks.get(id);
|
||||
if (task == null) {
|
||||
|
|
|
@ -157,6 +157,12 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
}
|
||||
|
||||
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
|
||||
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
|
||||
if (topicsStr == null || topicsStr.isEmpty()) {
|
||||
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
|
||||
}
|
||||
String[] topics = topicsStr.split(",");
|
||||
|
||||
// Include any unknown worker configs so consumer configs can be set globally on the worker
|
||||
// and through to the task
|
||||
Properties props = workerConfig.getUnusedProperties();
|
||||
|
@ -177,17 +183,21 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
throw new CopycatRuntimeException("Failed to create consumer", t);
|
||||
}
|
||||
|
||||
List<TopicPartition> topicPartitions = getInputTopicPartitions(taskProps);
|
||||
log.debug("Task {} subscribing to topic-partitions {}", id, topicPartitions);
|
||||
log.debug("Task {} subscribing to topics {}", id, topics);
|
||||
newConsumer.subscribe(topics);
|
||||
|
||||
// Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to
|
||||
// enable exactly once delivery to that system).
|
||||
//
|
||||
// To do this correctly, we need to first make sure we have been assigned partitions, which poll() will guarantee.
|
||||
// We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
|
||||
newConsumer.poll(0);
|
||||
Map<TopicPartition, Long> offsets = context.getOffsets();
|
||||
for (TopicPartition tp : topicPartitions) {
|
||||
org.apache.kafka.common.TopicPartition kafkatp = new
|
||||
org.apache.kafka.common.TopicPartition(tp.topic(), tp.partition());
|
||||
newConsumer.subscribe(kafkatp);
|
||||
if (offsets.containsKey(tp)) {
|
||||
long offset = offsets.get(tp);
|
||||
for (org.apache.kafka.common.TopicPartition kafkatp : newConsumer.subscriptions()) {
|
||||
TopicPartition tp = new TopicPartition(kafkatp.topic(), kafkatp.partition());
|
||||
Long offset = offsets.get(tp);
|
||||
if (offset != null)
|
||||
newConsumer.seek(kafkatp, offset);
|
||||
}
|
||||
}
|
||||
return newConsumer;
|
||||
}
|
||||
|
@ -196,24 +206,6 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
|
||||
}
|
||||
|
||||
private List<TopicPartition> getInputTopicPartitions(Properties taskProps) {
|
||||
String topicPartitionsStr = taskProps.getProperty(SinkTask.TOPICPARTITIONS_CONFIG);
|
||||
if (topicPartitionsStr == null || topicPartitionsStr.isEmpty()) {
|
||||
throw new CopycatRuntimeException("Sink tasks require a list of topic partitions, which "
|
||||
+ "copycat should generate automatically. This might "
|
||||
+ "indicate your Task class inherits from SinkTask, but "
|
||||
+ "your Connector class does not inherit from "
|
||||
+ "SinkConnector.");
|
||||
}
|
||||
|
||||
List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
|
||||
for (String topicPartitionStr : Arrays.asList(topicPartitionsStr.split(","))) {
|
||||
topicPartitions.add(new TopicPartition(topicPartitionStr));
|
||||
}
|
||||
return topicPartitions;
|
||||
}
|
||||
|
||||
|
||||
private void deliverMessages(ConsumerRecords<Object, Object> msgs) {
|
||||
// Finally, deliver this batch to the sink
|
||||
if (msgs.count() > 0) {
|
||||
|
|
|
@ -17,12 +17,7 @@
|
|||
|
||||
package org.apache.kafka.copycat.runtime.standalone;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.copycat.connector.ConnectorContext;
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.copycat.util.KafkaUtils;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* ConnectorContext for use with the StandaloneCoordinator, which maintains all connectors and tasks
|
||||
|
@ -32,13 +27,10 @@ public class StandaloneConnectorContext implements ConnectorContext {
|
|||
|
||||
private StandaloneCoordinator coordinator;
|
||||
private String connectorName;
|
||||
private ZkClient zkClient;
|
||||
|
||||
public StandaloneConnectorContext(StandaloneCoordinator coordinator, String connectorName,
|
||||
ZkClient zkClient) {
|
||||
public StandaloneConnectorContext(StandaloneCoordinator coordinator, String connectorName) {
|
||||
this.coordinator = coordinator;
|
||||
this.connectorName = connectorName;
|
||||
this.zkClient = zkClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -47,14 +39,4 @@ public class StandaloneConnectorContext implements ConnectorContext {
|
|||
// process
|
||||
coordinator.requestTaskReconfiguration(connectorName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TopicPartition> getTopicPartitions(String... topics) {
|
||||
return KafkaUtils.getTopicPartitions(zkClient, topics);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TopicPartition> getTopicPartitions(List<String> topics) {
|
||||
return KafkaUtils.getTopicPartitions(zkClient, topics);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.copycat.runtime.standalone;
|
|||
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.copycat.connector.Connector;
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||
|
@ -141,7 +140,7 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
// may be caused by user code
|
||||
throw new CopycatRuntimeException("Failed to create connector instance", t);
|
||||
}
|
||||
connector.initialize(new StandaloneConnectorContext(this, connName, worker.getZkClient()));
|
||||
connector.initialize(new StandaloneConnectorContext(this, connName));
|
||||
try {
|
||||
connector.start(configs);
|
||||
} catch (CopycatException e) {
|
||||
|
@ -194,32 +193,21 @@ public class StandaloneCoordinator implements Coordinator {
|
|||
|
||||
log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
|
||||
|
||||
int maxTasks = state.maxTasks;
|
||||
// For sink tasks, we may also be limited by the number of input topic partitions
|
||||
List<TopicPartition> topicPartitions = null;
|
||||
if (state.connector instanceof SinkConnector) {
|
||||
topicPartitions = KafkaUtils.getTopicPartitions(worker.getZkClient(), state.inputTopics);
|
||||
maxTasks = Math.min(maxTasks, topicPartitions.size());
|
||||
}
|
||||
List<Properties> taskConfigs = state.connector.getTaskConfigs(maxTasks);
|
||||
|
||||
// If necessary, figure out how to distribute input topic partitions
|
||||
// TODO: This needs to run periodically so we detect new partitions
|
||||
List<List<TopicPartition>> taskAssignments = null;
|
||||
if (state.connector instanceof SinkConnector) {
|
||||
taskAssignments = ConnectorUtils.groupPartitions(topicPartitions, taskConfigs.size());
|
||||
}
|
||||
List<Properties> taskConfigs = state.connector.getTaskConfigs(state.maxTasks);
|
||||
|
||||
// Generate the final configs, including framework provided settings
|
||||
Map<ConnectorTaskId, Properties> taskProps = new HashMap<ConnectorTaskId, Properties>();
|
||||
for (int i = 0; i < taskConfigs.size(); i++) {
|
||||
ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
|
||||
Properties config = taskConfigs.get(i);
|
||||
// TODO: This probably shouldn't be in the Coordinator. It's nice to have Copycat ensure the list of topics
|
||||
// is automatically provided to tasks since it is required by the framework, but this
|
||||
String subscriptionTopics = Utils.join(state.inputTopics, ",");
|
||||
if (state.connector instanceof SinkConnector) {
|
||||
// Make sure we don't modify the original since the connector may reuse it internally
|
||||
Properties configForSink = new Properties();
|
||||
configForSink.putAll(config);
|
||||
configForSink.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, Utils.join(taskAssignments.get(i), ","));
|
||||
configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
|
||||
config = configForSink;
|
||||
}
|
||||
taskProps.put(taskId, config);
|
||||
|
|
|
@ -1,53 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
**/
|
||||
|
||||
package org.apache.kafka.copycat.util;
|
||||
|
||||
import kafka.utils.ZkUtils;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
import scala.collection.Seq;
|
||||
import scala.collection.mutable.Map;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static scala.collection.JavaConversions.*;
|
||||
|
||||
/**
|
||||
* Some utilities for working with Kafka that aren't included with Kafka itself.
|
||||
*/
|
||||
public class KafkaUtils {
|
||||
|
||||
public static List<TopicPartition> getTopicPartitions(ZkClient zkClient, String... topics) {
|
||||
return getTopicPartitions(zkClient, Arrays.asList(topics));
|
||||
}
|
||||
|
||||
public static List<TopicPartition> getTopicPartitions(ZkClient zkClient, List<String> topics) {
|
||||
Seq<String> scalaTopics = asScalaIterable(topics).toSeq();
|
||||
List<TopicPartition> result = new ArrayList<TopicPartition>();
|
||||
Map<String, Seq<Object>> partitionsByTopic
|
||||
= ZkUtils.getPartitionsForTopics(zkClient, scalaTopics);
|
||||
for (java.util.Map.Entry<String, Seq<Object>> entry : asJavaMap(partitionsByTopic).entrySet()) {
|
||||
for (Object partition : asJavaIterable(entry.getValue())) {
|
||||
result.add(new TopicPartition(entry.getKey(), (Integer) partition));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -25,7 +25,6 @@ import org.apache.kafka.copycat.data.GenericRecord;
|
|||
import org.apache.kafka.copycat.data.GenericRecordBuilder;
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.copycat.data.SchemaBuilder;
|
||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||
import org.apache.kafka.copycat.sink.SinkTask;
|
||||
import org.apache.kafka.copycat.sink.SinkTaskContext;
|
||||
|
@ -88,21 +87,9 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
recordsReturned = 0;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetInputTopicPartitions() throws Exception {
|
||||
Properties props = new Properties();
|
||||
props.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "topic-1,foo-2");
|
||||
assertEquals(
|
||||
Arrays.asList(new org.apache.kafka.copycat.connector.TopicPartition("topic", 1),
|
||||
new org.apache.kafka.copycat.connector.TopicPartition("foo", 2)),
|
||||
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", props)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPollsInBackground() throws Exception {
|
||||
Properties taskProps = new Properties();
|
||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
|
||||
|
||||
expectInitializeTask(taskProps);
|
||||
Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
|
||||
|
@ -170,7 +157,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
@Test
|
||||
public void testCommit() throws Exception {
|
||||
Properties taskProps = new Properties();
|
||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
|
||||
|
||||
expectInitializeTask(taskProps);
|
||||
// Make each poll() take the offset commit interval
|
||||
|
@ -199,7 +185,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
@Test
|
||||
public void testCommitTaskFlushFailure() throws Exception {
|
||||
Properties taskProps = new Properties();
|
||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
|
||||
|
||||
expectInitializeTask(taskProps);
|
||||
Capture<Collection<SinkRecord>> capturedRecords
|
||||
|
@ -224,7 +209,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
@Test
|
||||
public void testCommitConsumerFailure() throws Exception {
|
||||
Properties taskProps = new Properties();
|
||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
|
||||
|
||||
expectInitializeTask(taskProps);
|
||||
Capture<Collection<SinkRecord>> capturedRecords
|
||||
|
@ -250,7 +234,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
@Test
|
||||
public void testCommitTimeout() throws Exception {
|
||||
Properties taskProps = new Properties();
|
||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
|
||||
|
||||
expectInitializeTask(taskProps);
|
||||
// Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
|
||||
|
@ -277,34 +260,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetInputPartitionsSingle() throws Exception {
|
||||
Properties taskProps = new Properties();
|
||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "test-1");
|
||||
|
||||
assertEquals(Arrays.asList(new org.apache.kafka.copycat.connector.TopicPartition("test", 1)),
|
||||
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", taskProps));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetInputPartitionsList() throws Exception {
|
||||
Properties taskProps = new Properties();
|
||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "test-1,foo-2,bar-3");
|
||||
|
||||
assertEquals(Arrays.asList(
|
||||
new org.apache.kafka.copycat.connector.TopicPartition("test", 1),
|
||||
new org.apache.kafka.copycat.connector.TopicPartition("foo", 2),
|
||||
new org.apache.kafka.copycat.connector.TopicPartition("bar", 3)),
|
||||
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", taskProps));
|
||||
}
|
||||
|
||||
@Test(expected = CopycatRuntimeException.class)
|
||||
public void testGetInputPartitionsMissing() throws Exception {
|
||||
// Missing setting
|
||||
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", new Properties());
|
||||
}
|
||||
|
||||
|
||||
private KafkaConsumer<Object, Object> expectInitializeTask(Properties taskProps)
|
||||
throws Exception {
|
||||
sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.kafka.copycat.runtime;
|
||||
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
|
@ -62,11 +61,9 @@ public class WorkerTest extends ThreadedTest {
|
|||
Properties workerProps = new Properties();
|
||||
workerProps.setProperty("schema.registry.url", "http://localhost:8081");
|
||||
WorkerConfig config = new WorkerConfig(workerProps);
|
||||
ZkClient zkClient = PowerMock.createMock(ZkClient.class);
|
||||
worker = new Worker(new MockTime(), config, offsetBackingStore,
|
||||
offsetKeySerializer, offsetValueSerializer,
|
||||
offsetKeyDeserializer, offsetValueDeserializer,
|
||||
zkClient);
|
||||
offsetKeyDeserializer, offsetValueDeserializer);
|
||||
worker.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
|||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.apache.kafka.copycat.util.KafkaUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -35,7 +34,7 @@ import java.io.File;
|
|||
import java.util.Properties;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PrepareForTest({StandaloneCoordinator.class, KafkaUtils.class})
|
||||
@PrepareForTest({StandaloneCoordinator.class})
|
||||
@PowerMockIgnore("javax.management.*")
|
||||
public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestBase {
|
||||
private File coordinatorConfigFile;
|
||||
|
@ -57,7 +56,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
|
|||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
|
||||
PowerMock.mockStatic(StandaloneCoordinator.class);
|
||||
PowerMock.mockStatic(KafkaUtils.class);
|
||||
|
||||
// These can be anything since connectors can pass along whatever they want.
|
||||
taskProps = new Properties();
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.kafka.copycat.runtime.Worker;
|
|||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.apache.kafka.copycat.util.FutureCallback;
|
||||
import org.apache.kafka.copycat.util.KafkaUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -35,7 +34,7 @@ import java.util.Properties;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
@PrepareForTest({StandaloneCoordinator.class, KafkaUtils.class})
|
||||
@PrepareForTest({StandaloneCoordinator.class})
|
||||
@PowerMockIgnore("javax.management.*")
|
||||
public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
||||
|
||||
|
@ -49,7 +48,6 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
|||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
|
||||
PowerMock.mockStatic(StandaloneCoordinator.class);
|
||||
PowerMock.mockStatic(KafkaUtils.class);
|
||||
|
||||
// These can be anything since connectors can pass along whatever they want.
|
||||
taskProps = new Properties();
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.runtime.standalone;
|
||||
|
||||
import org.I0Itec.zkclient.ZkClient;
|
||||
import org.apache.kafka.copycat.connector.Connector;
|
||||
import org.apache.kafka.copycat.connector.Task;
|
||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||
|
@ -30,22 +29,16 @@ import org.apache.kafka.copycat.source.SourceConnector;
|
|||
import org.apache.kafka.copycat.source.SourceTask;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.apache.kafka.copycat.util.ConnectorTaskId;
|
||||
import org.apache.kafka.copycat.util.KafkaUtils;
|
||||
import org.easymock.EasyMock;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
public class StandaloneCoordinatorTestBase {
|
||||
|
||||
protected static final String CONNECTOR_NAME = "test";
|
||||
protected static final String TOPICS_LIST_STR = "topic1,topic2";
|
||||
protected static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
|
||||
protected static final List<TopicPartition> TOPIC_PARTITIONS = Arrays.asList(
|
||||
new TopicPartition("topic1", 1), new TopicPartition("topic2", 1));
|
||||
protected static final String TOPIC_PARTITIONS_STR = "topic1-1,topic2-1";
|
||||
|
||||
protected StandaloneCoordinator coordinator;
|
||||
protected Worker worker;
|
||||
|
@ -79,8 +72,6 @@ public class StandaloneCoordinatorTestBase {
|
|||
PowerMock.expectLastCall();
|
||||
}
|
||||
|
||||
ZkClient zkClient = PowerMock.createMock(ZkClient.class);
|
||||
EasyMock.expect(worker.getZkClient()).andStubReturn(zkClient);
|
||||
connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
|
||||
PowerMock.expectLastCall();
|
||||
connector.start(new Properties());
|
||||
|
@ -89,20 +80,14 @@ public class StandaloneCoordinatorTestBase {
|
|||
// Just return the connector properties for the individual task we generate by default
|
||||
EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass);
|
||||
|
||||
if (sink) {
|
||||
EasyMock.expect(KafkaUtils.getTopicPartitions(zkClient, TOPICS_LIST))
|
||||
.andReturn(TOPIC_PARTITIONS);
|
||||
}
|
||||
|
||||
EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
|
||||
.andReturn(Arrays.asList(taskProps));
|
||||
// And we should instantiate the tasks. For a sink task, we should see added properties for
|
||||
// the input topic partitions
|
||||
Properties generatedTaskProps = new Properties();
|
||||
generatedTaskProps.putAll(taskProps);
|
||||
if (sink) {
|
||||
generatedTaskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITIONS_STR);
|
||||
}
|
||||
if (sink)
|
||||
generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
|
||||
worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps);
|
||||
PowerMock.expectLastCall();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue