mirror of https://github.com/apache/kafka.git
Switch to using new consumer coordinator instead of manually assigning partitions. Remove dependency of copycat-runtime on core.
This commit is contained in:
parent
4a9b4f3c67
commit
dec137998a
|
@ -604,8 +604,6 @@ project(':copycat-runtime') {
|
||||||
compile project(':copycat-avro')
|
compile project(':copycat-avro')
|
||||||
compile project(':clients')
|
compile project(':clients')
|
||||||
compile "org.slf4j:slf4j-api:1.7.6"
|
compile "org.slf4j:slf4j-api:1.7.6"
|
||||||
// FIXME we shouldn't depend on core since we only need clients, but currently we're doing sink task topic-partition assignment manually until we switch over to new consumer group functionality
|
|
||||||
compile project(':core')
|
|
||||||
|
|
||||||
testCompile 'junit:junit:4.6'
|
testCompile 'junit:junit:4.6'
|
||||||
testCompile 'org.easymock:easymock:3.3.1'
|
testCompile 'org.easymock:easymock:3.3.1'
|
||||||
|
|
|
@ -135,10 +135,6 @@
|
||||||
<!-- for tests -->
|
<!-- for tests -->
|
||||||
<allow pkg="org.easymock" />
|
<allow pkg="org.easymock" />
|
||||||
<allow pkg="org.powermock" />
|
<allow pkg="org.powermock" />
|
||||||
|
|
||||||
<!-- FIXME Remove when dependency on core for sink partition assignment is removed -->
|
|
||||||
<allow pkg="kafka.utils" />
|
|
||||||
<allow pkg="org.I0Itec.zkclient" />
|
|
||||||
</subpackage>
|
</subpackage>
|
||||||
|
|
||||||
<subpackage name="cli">
|
<subpackage name="cli">
|
||||||
|
@ -156,11 +152,6 @@
|
||||||
|
|
||||||
<subpackage name="util">
|
<subpackage name="util">
|
||||||
<allow pkg="org.apache.kafka.copycat" />
|
<allow pkg="org.apache.kafka.copycat" />
|
||||||
|
|
||||||
<!-- FIXME Remove when dependency on core for sink partition assignment is removed -->
|
|
||||||
<allow pkg="kafka.utils" />
|
|
||||||
<allow pkg="org.I0Itec.zkclient" />
|
|
||||||
<allow pkg="scala" />
|
|
||||||
</subpackage>
|
</subpackage>
|
||||||
|
|
||||||
<subpackage name="file">
|
<subpackage name="file">
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
zookeeper.connect=localhost:2181
|
|
||||||
bootstrap.servers=localhost:9092
|
bootstrap.servers=localhost:9092
|
||||||
schema.registry.url=http://localhost:8081
|
schema.registry.url=http://localhost:8081
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,5 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
# These are defaults. This file just demonstrates how to override some settings.
|
# These are defaults. This file just demonstrates how to override some settings.
|
||||||
zookeeper.connect=localhost:2181
|
|
||||||
bootstrap.servers=localhost:9092
|
bootstrap.servers=localhost:9092
|
||||||
schema.registry.url=http://localhost:8081
|
schema.registry.url=http://localhost:8081
|
|
@ -17,8 +17,6 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.connector;
|
package org.apache.kafka.copycat.connector;
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ConnectorContext allows Connectors to proactively interact with the Copycat runtime.
|
* ConnectorContext allows Connectors to proactively interact with the Copycat runtime.
|
||||||
*/
|
*/
|
||||||
|
@ -29,21 +27,4 @@ public interface ConnectorContext {
|
||||||
* added/removed) and the running Tasks will need to be modified.
|
* added/removed) and the running Tasks will need to be modified.
|
||||||
*/
|
*/
|
||||||
public void requestTaskReconfiguration();
|
public void requestTaskReconfiguration();
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a list of TopicPartitions for the specified topics. This should be used to determine how
|
|
||||||
* to divide TopicPartitions between child tasks.
|
|
||||||
* @param topics list of topics to get partitions for
|
|
||||||
* @return list of all TopicPartitions for the input topics
|
|
||||||
*/
|
|
||||||
public abstract List<TopicPartition> getTopicPartitions(String... topics);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a list of TopicPartitions for the specified topics. This should be used to determine how
|
|
||||||
* to divide TopicPartitions between child tasks.
|
|
||||||
* @param topics list of topics to get partitions for
|
|
||||||
* @return list of all TopicPartitions for the input topics
|
|
||||||
*/
|
|
||||||
public abstract List<TopicPartition> getTopicPartitions(List<String> topics);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,16 +32,11 @@ public abstract class SinkTask implements Task {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* The configuration key that provides the list of topic partitions that are inputs for this
|
* The configuration key that provides the list of topics that are inputs for this
|
||||||
* SinkTask.
|
* SinkTask.
|
||||||
* </p>
|
* </p>
|
||||||
* <p>
|
|
||||||
* Usually this setting is only used by the Copycat framework since it manages the Kafka
|
|
||||||
* consumer that provides input records. However, it is provided here for the convenience of
|
|
||||||
* SinkTask implementations that may also want to know the input set of topic partitions.
|
|
||||||
* </p>
|
|
||||||
*/
|
*/
|
||||||
public static final String TOPICPARTITIONS_CONFIG = "topic.partitions";
|
public static final String TOPICS_CONFIG = "topics";
|
||||||
|
|
||||||
protected SinkTaskContext context;
|
protected SinkTaskContext context;
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.copycat.connector.ConnectorContext;
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||||
import org.easymock.EasyMock;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.powermock.api.easymock.PowerMock;
|
import org.powermock.api.easymock.PowerMock;
|
||||||
|
@ -84,9 +83,4 @@ public class FileStreamSinkConnectorTest {
|
||||||
|
|
||||||
PowerMock.verifyAll();
|
PowerMock.verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void expectGetTopicPartitionsMultiple() {
|
|
||||||
EasyMock.expect(ctx.getTopicPartitions(MULTIPLE_TOPICS_LIST[0], MULTIPLE_TOPICS_LIST[1]))
|
|
||||||
.andReturn(MULTIPLE_TOPICS_PARTITIONS);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.kafka.copycat.file;
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||||
import org.apache.kafka.copycat.sink.SinkTask;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -45,8 +44,6 @@ public class FileStreamSinkTaskTest {
|
||||||
printStream = new PrintStream(os);
|
printStream = new PrintStream(os);
|
||||||
task = new FileStreamSinkTask(printStream);
|
task = new FileStreamSinkTask(printStream);
|
||||||
config = new Properties();
|
config = new Properties();
|
||||||
|
|
||||||
config.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "topic1-1,topic2-2");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -74,9 +74,8 @@ public class Copycat {
|
||||||
FutureCallback cb = new FutureCallback(new Callback<Void>() {
|
FutureCallback cb = new FutureCallback(new Callback<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void onCompletion(Throwable error, Void result) {
|
public void onCompletion(Throwable error, Void result) {
|
||||||
if (error != null) {
|
if (error != null)
|
||||||
log.error("Failed to stop job {}", connName);
|
log.error("Failed to stop job {}", connName);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
coordinator.deleteConnector(connName, cb);
|
coordinator.deleteConnector(connName, cb);
|
||||||
|
@ -84,15 +83,13 @@ public class Copycat {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create any new connectors
|
// Create any new connectors
|
||||||
for (final String connectorPropsFile : config
|
for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
|
||||||
.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
|
|
||||||
connectorProps = Utils.loadProps(connectorPropsFile);
|
connectorProps = Utils.loadProps(connectorPropsFile);
|
||||||
FutureCallback cb = new FutureCallback(new Callback<String>() {
|
FutureCallback cb = new FutureCallback(new Callback<String>() {
|
||||||
@Override
|
@Override
|
||||||
public void onCompletion(Throwable error, String id) {
|
public void onCompletion(Throwable error, String id) {
|
||||||
if (error != null) {
|
if (error != null)
|
||||||
log.error("Failed to create job for {}", connectorPropsFile);
|
log.error("Failed to create job for {}", connectorPropsFile);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
coordinator.addConnector(connectorProps, cb);
|
coordinator.addConnector(connectorProps, cb);
|
||||||
|
|
|
@ -37,30 +37,6 @@ public class WorkerConfig extends AbstractConfig {
|
||||||
+ "or instances may co-exist while sharing a single Kafka cluster.";
|
+ "or instances may co-exist while sharing a single Kafka cluster.";
|
||||||
public static final String CLUSTER_DEFAULT = "copycat";
|
public static final String CLUSTER_DEFAULT = "copycat";
|
||||||
|
|
||||||
public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
|
|
||||||
private static final String ZOOKEEPER_CONNECT_DOC =
|
|
||||||
"Specifies the ZooKeeper connection string in the form "
|
|
||||||
+ "hostname:port where host and port are the host and port of a ZooKeeper server. To allow connecting "
|
|
||||||
+ "through other ZooKeeper nodes when that ZooKeeper machine is down you can also specify multiple hosts "
|
|
||||||
+ "in the form hostname1:port1,hostname2:port2,hostname3:port3.\n"
|
|
||||||
+ "\n"
|
|
||||||
+ "The server may also have a ZooKeeper chroot path as part of it's ZooKeeper connection string which puts "
|
|
||||||
+ "its data under some path in the global ZooKeeper namespace. If so the consumer should use the same "
|
|
||||||
+ "chroot path in its connection string. For example to give a chroot path of /chroot/path you would give "
|
|
||||||
+ "the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.";
|
|
||||||
public static final String ZOOKEEPER_CONNECT_DEFAULT = "localhost:2181";
|
|
||||||
|
|
||||||
public static final String ZOOKEEPER_SESSION_TIMEOUT_MS_CONFIG = "zookeeper.session.timeout.ms";
|
|
||||||
private static final String ZOOKEEPER_SESSION_TIMEOUT_MS_DOC
|
|
||||||
= "Session timeout for ZooKeeper connections.";
|
|
||||||
public static final String ZOOKEEPER_SESSION_TIMEOUT_MS_DEFAULT = "30000";
|
|
||||||
|
|
||||||
public static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS_CONFIG
|
|
||||||
= "zookeeper.session.connection.ms";
|
|
||||||
private static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS_DOC
|
|
||||||
= "Connection timeout for ZooKeeper.";
|
|
||||||
public static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS_DEFAULT = "30000";
|
|
||||||
|
|
||||||
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
|
public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
|
||||||
public static final String BOOSTRAP_SERVERS_DOC
|
public static final String BOOSTRAP_SERVERS_DOC
|
||||||
= "A list of host/port pairs to use for establishing the initial connection to the Kafka "
|
= "A list of host/port pairs to use for establishing the initial connection to the Kafka "
|
||||||
|
@ -159,14 +135,6 @@ public class WorkerConfig extends AbstractConfig {
|
||||||
static {
|
static {
|
||||||
config = new ConfigDef()
|
config = new ConfigDef()
|
||||||
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
|
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
|
||||||
.define(ZOOKEEPER_CONNECT_CONFIG, Type.STRING, ZOOKEEPER_CONNECT_DEFAULT,
|
|
||||||
Importance.HIGH, ZOOKEEPER_CONNECT_DOC)
|
|
||||||
.define(ZOOKEEPER_SESSION_TIMEOUT_MS_CONFIG, Type.INT,
|
|
||||||
ZOOKEEPER_SESSION_TIMEOUT_MS_DEFAULT,
|
|
||||||
Importance.LOW, ZOOKEEPER_SESSION_TIMEOUT_MS_DOC)
|
|
||||||
.define(ZOOKEEPER_CONNECTION_TIMEOUT_MS_CONFIG, Type.INT,
|
|
||||||
ZOOKEEPER_CONNECTION_TIMEOUT_MS_DEFAULT,
|
|
||||||
Importance.LOW, ZOOKEEPER_CONNECTION_TIMEOUT_MS_DOC)
|
|
||||||
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
|
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
|
||||||
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
|
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
|
||||||
.define(CONVERTER_CLASS_CONFIG, Type.CLASS, CONVERTER_CLASS_DEFAULT,
|
.define(CONVERTER_CLASS_CONFIG, Type.CLASS, CONVERTER_CLASS_DEFAULT,
|
||||||
|
|
|
@ -19,8 +19,6 @@ package org.apache.kafka.copycat.runtime;
|
||||||
|
|
||||||
import org.apache.kafka.common.utils.SystemTime;
|
import org.apache.kafka.common.utils.SystemTime;
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import kafka.utils.ZKStringSerializer$;
|
|
||||||
import org.I0Itec.zkclient.ZkClient;
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
|
@ -62,7 +60,6 @@ public class Worker {
|
||||||
private OffsetDeserializer offsetValueDeserializer;
|
private OffsetDeserializer offsetValueDeserializer;
|
||||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
|
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
|
||||||
private KafkaProducer producer;
|
private KafkaProducer producer;
|
||||||
private ZkClient zkClient;
|
|
||||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||||
|
|
||||||
public Worker(WorkerConfig config) {
|
public Worker(WorkerConfig config) {
|
||||||
|
@ -70,14 +67,12 @@ public class Worker {
|
||||||
Reflection.instantiateConfigurable(
|
Reflection.instantiateConfigurable(
|
||||||
config.getClass(WorkerConfig.OFFSET_STORAGE_CLASS_CONFIG).getName(),
|
config.getClass(WorkerConfig.OFFSET_STORAGE_CLASS_CONFIG).getName(),
|
||||||
OffsetBackingStore.class, config.getUnusedProperties()),
|
OffsetBackingStore.class, config.getUnusedProperties()),
|
||||||
null, null, null, null,
|
null, null, null, null);
|
||||||
createZkClient(config));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
|
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
|
||||||
OffsetSerializer offsetKeySerializer, OffsetSerializer offsetValueSerializer,
|
OffsetSerializer offsetKeySerializer, OffsetSerializer offsetValueSerializer,
|
||||||
OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer,
|
OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer) {
|
||||||
ZkClient zkClient) {
|
|
||||||
this.time = time;
|
this.time = time;
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(),
|
this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(),
|
||||||
|
@ -119,16 +114,6 @@ public class Worker {
|
||||||
OffsetDeserializer.class);
|
OffsetDeserializer.class);
|
||||||
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
|
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.zkClient = zkClient;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ZkClient createZkClient(WorkerConfig config) {
|
|
||||||
return new ZkClient(config.getString(WorkerConfig.ZOOKEEPER_CONNECT_CONFIG),
|
|
||||||
config.getInt(WorkerConfig.ZOOKEEPER_SESSION_TIMEOUT_MS_CONFIG),
|
|
||||||
config.getInt(WorkerConfig.ZOOKEEPER_CONNECTION_TIMEOUT_MS_CONFIG),
|
|
||||||
ZKStringSerializer$.MODULE$);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
|
@ -245,10 +230,6 @@ public class Worker {
|
||||||
tasks.remove(id);
|
tasks.remove(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ZkClient getZkClient() {
|
|
||||||
return zkClient;
|
|
||||||
}
|
|
||||||
|
|
||||||
private WorkerTask getTask(ConnectorTaskId id) {
|
private WorkerTask getTask(ConnectorTaskId id) {
|
||||||
WorkerTask task = tasks.get(id);
|
WorkerTask task = tasks.get(id);
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
|
|
|
@ -157,6 +157,12 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
}
|
}
|
||||||
|
|
||||||
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
|
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
|
||||||
|
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
|
||||||
|
if (topicsStr == null || topicsStr.isEmpty()) {
|
||||||
|
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
|
||||||
|
}
|
||||||
|
String[] topics = topicsStr.split(",");
|
||||||
|
|
||||||
// Include any unknown worker configs so consumer configs can be set globally on the worker
|
// Include any unknown worker configs so consumer configs can be set globally on the worker
|
||||||
// and through to the task
|
// and through to the task
|
||||||
Properties props = workerConfig.getUnusedProperties();
|
Properties props = workerConfig.getUnusedProperties();
|
||||||
|
@ -177,17 +183,21 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
throw new CopycatRuntimeException("Failed to create consumer", t);
|
throw new CopycatRuntimeException("Failed to create consumer", t);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<TopicPartition> topicPartitions = getInputTopicPartitions(taskProps);
|
log.debug("Task {} subscribing to topics {}", id, topics);
|
||||||
log.debug("Task {} subscribing to topic-partitions {}", id, topicPartitions);
|
newConsumer.subscribe(topics);
|
||||||
|
|
||||||
|
// Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to
|
||||||
|
// enable exactly once delivery to that system).
|
||||||
|
//
|
||||||
|
// To do this correctly, we need to first make sure we have been assigned partitions, which poll() will guarantee.
|
||||||
|
// We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
|
||||||
|
newConsumer.poll(0);
|
||||||
Map<TopicPartition, Long> offsets = context.getOffsets();
|
Map<TopicPartition, Long> offsets = context.getOffsets();
|
||||||
for (TopicPartition tp : topicPartitions) {
|
for (org.apache.kafka.common.TopicPartition kafkatp : newConsumer.subscriptions()) {
|
||||||
org.apache.kafka.common.TopicPartition kafkatp = new
|
TopicPartition tp = new TopicPartition(kafkatp.topic(), kafkatp.partition());
|
||||||
org.apache.kafka.common.TopicPartition(tp.topic(), tp.partition());
|
Long offset = offsets.get(tp);
|
||||||
newConsumer.subscribe(kafkatp);
|
if (offset != null)
|
||||||
if (offsets.containsKey(tp)) {
|
|
||||||
long offset = offsets.get(tp);
|
|
||||||
newConsumer.seek(kafkatp, offset);
|
newConsumer.seek(kafkatp, offset);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return newConsumer;
|
return newConsumer;
|
||||||
}
|
}
|
||||||
|
@ -196,24 +206,6 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
|
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<TopicPartition> getInputTopicPartitions(Properties taskProps) {
|
|
||||||
String topicPartitionsStr = taskProps.getProperty(SinkTask.TOPICPARTITIONS_CONFIG);
|
|
||||||
if (topicPartitionsStr == null || topicPartitionsStr.isEmpty()) {
|
|
||||||
throw new CopycatRuntimeException("Sink tasks require a list of topic partitions, which "
|
|
||||||
+ "copycat should generate automatically. This might "
|
|
||||||
+ "indicate your Task class inherits from SinkTask, but "
|
|
||||||
+ "your Connector class does not inherit from "
|
|
||||||
+ "SinkConnector.");
|
|
||||||
}
|
|
||||||
|
|
||||||
List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
|
|
||||||
for (String topicPartitionStr : Arrays.asList(topicPartitionsStr.split(","))) {
|
|
||||||
topicPartitions.add(new TopicPartition(topicPartitionStr));
|
|
||||||
}
|
|
||||||
return topicPartitions;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private void deliverMessages(ConsumerRecords<Object, Object> msgs) {
|
private void deliverMessages(ConsumerRecords<Object, Object> msgs) {
|
||||||
// Finally, deliver this batch to the sink
|
// Finally, deliver this batch to the sink
|
||||||
if (msgs.count() > 0) {
|
if (msgs.count() > 0) {
|
||||||
|
|
|
@ -17,12 +17,7 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.runtime.standalone;
|
package org.apache.kafka.copycat.runtime.standalone;
|
||||||
|
|
||||||
import org.I0Itec.zkclient.ZkClient;
|
|
||||||
import org.apache.kafka.copycat.connector.ConnectorContext;
|
import org.apache.kafka.copycat.connector.ConnectorContext;
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
|
||||||
import org.apache.kafka.copycat.util.KafkaUtils;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ConnectorContext for use with the StandaloneCoordinator, which maintains all connectors and tasks
|
* ConnectorContext for use with the StandaloneCoordinator, which maintains all connectors and tasks
|
||||||
|
@ -32,13 +27,10 @@ public class StandaloneConnectorContext implements ConnectorContext {
|
||||||
|
|
||||||
private StandaloneCoordinator coordinator;
|
private StandaloneCoordinator coordinator;
|
||||||
private String connectorName;
|
private String connectorName;
|
||||||
private ZkClient zkClient;
|
|
||||||
|
|
||||||
public StandaloneConnectorContext(StandaloneCoordinator coordinator, String connectorName,
|
public StandaloneConnectorContext(StandaloneCoordinator coordinator, String connectorName) {
|
||||||
ZkClient zkClient) {
|
|
||||||
this.coordinator = coordinator;
|
this.coordinator = coordinator;
|
||||||
this.connectorName = connectorName;
|
this.connectorName = connectorName;
|
||||||
this.zkClient = zkClient;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -47,14 +39,4 @@ public class StandaloneConnectorContext implements ConnectorContext {
|
||||||
// process
|
// process
|
||||||
coordinator.requestTaskReconfiguration(connectorName);
|
coordinator.requestTaskReconfiguration(connectorName);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<TopicPartition> getTopicPartitions(String... topics) {
|
|
||||||
return KafkaUtils.getTopicPartitions(zkClient, topics);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<TopicPartition> getTopicPartitions(List<String> topics) {
|
|
||||||
return KafkaUtils.getTopicPartitions(zkClient, topics);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.kafka.copycat.runtime.standalone;
|
||||||
|
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.copycat.connector.Connector;
|
import org.apache.kafka.copycat.connector.Connector;
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
||||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||||
|
@ -141,7 +140,7 @@ public class StandaloneCoordinator implements Coordinator {
|
||||||
// may be caused by user code
|
// may be caused by user code
|
||||||
throw new CopycatRuntimeException("Failed to create connector instance", t);
|
throw new CopycatRuntimeException("Failed to create connector instance", t);
|
||||||
}
|
}
|
||||||
connector.initialize(new StandaloneConnectorContext(this, connName, worker.getZkClient()));
|
connector.initialize(new StandaloneConnectorContext(this, connName));
|
||||||
try {
|
try {
|
||||||
connector.start(configs);
|
connector.start(configs);
|
||||||
} catch (CopycatException e) {
|
} catch (CopycatException e) {
|
||||||
|
@ -194,32 +193,21 @@ public class StandaloneCoordinator implements Coordinator {
|
||||||
|
|
||||||
log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
|
log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
|
||||||
|
|
||||||
int maxTasks = state.maxTasks;
|
List<Properties> taskConfigs = state.connector.getTaskConfigs(state.maxTasks);
|
||||||
// For sink tasks, we may also be limited by the number of input topic partitions
|
|
||||||
List<TopicPartition> topicPartitions = null;
|
|
||||||
if (state.connector instanceof SinkConnector) {
|
|
||||||
topicPartitions = KafkaUtils.getTopicPartitions(worker.getZkClient(), state.inputTopics);
|
|
||||||
maxTasks = Math.min(maxTasks, topicPartitions.size());
|
|
||||||
}
|
|
||||||
List<Properties> taskConfigs = state.connector.getTaskConfigs(maxTasks);
|
|
||||||
|
|
||||||
// If necessary, figure out how to distribute input topic partitions
|
|
||||||
// TODO: This needs to run periodically so we detect new partitions
|
|
||||||
List<List<TopicPartition>> taskAssignments = null;
|
|
||||||
if (state.connector instanceof SinkConnector) {
|
|
||||||
taskAssignments = ConnectorUtils.groupPartitions(topicPartitions, taskConfigs.size());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Generate the final configs, including framework provided settings
|
// Generate the final configs, including framework provided settings
|
||||||
Map<ConnectorTaskId, Properties> taskProps = new HashMap<ConnectorTaskId, Properties>();
|
Map<ConnectorTaskId, Properties> taskProps = new HashMap<ConnectorTaskId, Properties>();
|
||||||
for (int i = 0; i < taskConfigs.size(); i++) {
|
for (int i = 0; i < taskConfigs.size(); i++) {
|
||||||
ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
|
ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
|
||||||
Properties config = taskConfigs.get(i);
|
Properties config = taskConfigs.get(i);
|
||||||
|
// TODO: This probably shouldn't be in the Coordinator. It's nice to have Copycat ensure the list of topics
|
||||||
|
// is automatically provided to tasks since it is required by the framework, but this
|
||||||
|
String subscriptionTopics = Utils.join(state.inputTopics, ",");
|
||||||
if (state.connector instanceof SinkConnector) {
|
if (state.connector instanceof SinkConnector) {
|
||||||
// Make sure we don't modify the original since the connector may reuse it internally
|
// Make sure we don't modify the original since the connector may reuse it internally
|
||||||
Properties configForSink = new Properties();
|
Properties configForSink = new Properties();
|
||||||
configForSink.putAll(config);
|
configForSink.putAll(config);
|
||||||
configForSink.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, Utils.join(taskAssignments.get(i), ","));
|
configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
|
||||||
config = configForSink;
|
config = configForSink;
|
||||||
}
|
}
|
||||||
taskProps.put(taskId, config);
|
taskProps.put(taskId, config);
|
||||||
|
|
|
@ -1,53 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
* <p/>
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
* <p/>
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
**/
|
|
||||||
|
|
||||||
package org.apache.kafka.copycat.util;
|
|
||||||
|
|
||||||
import kafka.utils.ZkUtils;
|
|
||||||
import org.I0Itec.zkclient.ZkClient;
|
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
|
||||||
import scala.collection.Seq;
|
|
||||||
import scala.collection.mutable.Map;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static scala.collection.JavaConversions.*;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Some utilities for working with Kafka that aren't included with Kafka itself.
|
|
||||||
*/
|
|
||||||
public class KafkaUtils {
|
|
||||||
|
|
||||||
public static List<TopicPartition> getTopicPartitions(ZkClient zkClient, String... topics) {
|
|
||||||
return getTopicPartitions(zkClient, Arrays.asList(topics));
|
|
||||||
}
|
|
||||||
|
|
||||||
public static List<TopicPartition> getTopicPartitions(ZkClient zkClient, List<String> topics) {
|
|
||||||
Seq<String> scalaTopics = asScalaIterable(topics).toSeq();
|
|
||||||
List<TopicPartition> result = new ArrayList<TopicPartition>();
|
|
||||||
Map<String, Seq<Object>> partitionsByTopic
|
|
||||||
= ZkUtils.getPartitionsForTopics(zkClient, scalaTopics);
|
|
||||||
for (java.util.Map.Entry<String, Seq<Object>> entry : asJavaMap(partitionsByTopic).entrySet()) {
|
|
||||||
for (Object partition : asJavaIterable(entry.getValue())) {
|
|
||||||
result.add(new TopicPartition(entry.getKey(), (Integer) partition));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -25,7 +25,6 @@ import org.apache.kafka.copycat.data.GenericRecord;
|
||||||
import org.apache.kafka.copycat.data.GenericRecordBuilder;
|
import org.apache.kafka.copycat.data.GenericRecordBuilder;
|
||||||
import org.apache.kafka.copycat.data.Schema;
|
import org.apache.kafka.copycat.data.Schema;
|
||||||
import org.apache.kafka.copycat.data.SchemaBuilder;
|
import org.apache.kafka.copycat.data.SchemaBuilder;
|
||||||
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
|
|
||||||
import org.apache.kafka.copycat.sink.SinkRecord;
|
import org.apache.kafka.copycat.sink.SinkRecord;
|
||||||
import org.apache.kafka.copycat.sink.SinkTask;
|
import org.apache.kafka.copycat.sink.SinkTask;
|
||||||
import org.apache.kafka.copycat.sink.SinkTaskContext;
|
import org.apache.kafka.copycat.sink.SinkTaskContext;
|
||||||
|
@ -88,21 +87,9 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
recordsReturned = 0;
|
recordsReturned = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetInputTopicPartitions() throws Exception {
|
|
||||||
Properties props = new Properties();
|
|
||||||
props.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "topic-1,foo-2");
|
|
||||||
assertEquals(
|
|
||||||
Arrays.asList(new org.apache.kafka.copycat.connector.TopicPartition("topic", 1),
|
|
||||||
new org.apache.kafka.copycat.connector.TopicPartition("foo", 2)),
|
|
||||||
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", props)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPollsInBackground() throws Exception {
|
public void testPollsInBackground() throws Exception {
|
||||||
Properties taskProps = new Properties();
|
Properties taskProps = new Properties();
|
||||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
|
|
||||||
|
|
||||||
expectInitializeTask(taskProps);
|
expectInitializeTask(taskProps);
|
||||||
Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
|
Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
|
||||||
|
@ -170,7 +157,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
@Test
|
@Test
|
||||||
public void testCommit() throws Exception {
|
public void testCommit() throws Exception {
|
||||||
Properties taskProps = new Properties();
|
Properties taskProps = new Properties();
|
||||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
|
|
||||||
|
|
||||||
expectInitializeTask(taskProps);
|
expectInitializeTask(taskProps);
|
||||||
// Make each poll() take the offset commit interval
|
// Make each poll() take the offset commit interval
|
||||||
|
@ -199,7 +185,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
@Test
|
@Test
|
||||||
public void testCommitTaskFlushFailure() throws Exception {
|
public void testCommitTaskFlushFailure() throws Exception {
|
||||||
Properties taskProps = new Properties();
|
Properties taskProps = new Properties();
|
||||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
|
|
||||||
|
|
||||||
expectInitializeTask(taskProps);
|
expectInitializeTask(taskProps);
|
||||||
Capture<Collection<SinkRecord>> capturedRecords
|
Capture<Collection<SinkRecord>> capturedRecords
|
||||||
|
@ -224,7 +209,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
@Test
|
@Test
|
||||||
public void testCommitConsumerFailure() throws Exception {
|
public void testCommitConsumerFailure() throws Exception {
|
||||||
Properties taskProps = new Properties();
|
Properties taskProps = new Properties();
|
||||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
|
|
||||||
|
|
||||||
expectInitializeTask(taskProps);
|
expectInitializeTask(taskProps);
|
||||||
Capture<Collection<SinkRecord>> capturedRecords
|
Capture<Collection<SinkRecord>> capturedRecords
|
||||||
|
@ -250,7 +234,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
@Test
|
@Test
|
||||||
public void testCommitTimeout() throws Exception {
|
public void testCommitTimeout() throws Exception {
|
||||||
Properties taskProps = new Properties();
|
Properties taskProps = new Properties();
|
||||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITION_STR);
|
|
||||||
|
|
||||||
expectInitializeTask(taskProps);
|
expectInitializeTask(taskProps);
|
||||||
// Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
|
// Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
|
||||||
|
@ -277,34 +260,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
PowerMock.verifyAll();
|
PowerMock.verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetInputPartitionsSingle() throws Exception {
|
|
||||||
Properties taskProps = new Properties();
|
|
||||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "test-1");
|
|
||||||
|
|
||||||
assertEquals(Arrays.asList(new org.apache.kafka.copycat.connector.TopicPartition("test", 1)),
|
|
||||||
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", taskProps));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetInputPartitionsList() throws Exception {
|
|
||||||
Properties taskProps = new Properties();
|
|
||||||
taskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, "test-1,foo-2,bar-3");
|
|
||||||
|
|
||||||
assertEquals(Arrays.asList(
|
|
||||||
new org.apache.kafka.copycat.connector.TopicPartition("test", 1),
|
|
||||||
new org.apache.kafka.copycat.connector.TopicPartition("foo", 2),
|
|
||||||
new org.apache.kafka.copycat.connector.TopicPartition("bar", 3)),
|
|
||||||
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", taskProps));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(expected = CopycatRuntimeException.class)
|
|
||||||
public void testGetInputPartitionsMissing() throws Exception {
|
|
||||||
// Missing setting
|
|
||||||
Whitebox.invokeMethod(workerTask, "getInputTopicPartitions", new Properties());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
private KafkaConsumer<Object, Object> expectInitializeTask(Properties taskProps)
|
private KafkaConsumer<Object, Object> expectInitializeTask(Properties taskProps)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
|
sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.kafka.copycat.runtime;
|
package org.apache.kafka.copycat.runtime;
|
||||||
|
|
||||||
import org.apache.kafka.common.utils.Time;
|
import org.apache.kafka.common.utils.Time;
|
||||||
import org.I0Itec.zkclient.ZkClient;
|
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||||
import org.apache.kafka.copycat.errors.CopycatException;
|
import org.apache.kafka.copycat.errors.CopycatException;
|
||||||
|
@ -62,11 +61,9 @@ public class WorkerTest extends ThreadedTest {
|
||||||
Properties workerProps = new Properties();
|
Properties workerProps = new Properties();
|
||||||
workerProps.setProperty("schema.registry.url", "http://localhost:8081");
|
workerProps.setProperty("schema.registry.url", "http://localhost:8081");
|
||||||
WorkerConfig config = new WorkerConfig(workerProps);
|
WorkerConfig config = new WorkerConfig(workerProps);
|
||||||
ZkClient zkClient = PowerMock.createMock(ZkClient.class);
|
|
||||||
worker = new Worker(new MockTime(), config, offsetBackingStore,
|
worker = new Worker(new MockTime(), config, offsetBackingStore,
|
||||||
offsetKeySerializer, offsetValueSerializer,
|
offsetKeySerializer, offsetValueSerializer,
|
||||||
offsetKeyDeserializer, offsetValueDeserializer,
|
offsetKeyDeserializer, offsetValueDeserializer);
|
||||||
zkClient);
|
|
||||||
worker.start();
|
worker.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||||
import org.apache.kafka.copycat.runtime.Worker;
|
import org.apache.kafka.copycat.runtime.Worker;
|
||||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||||
import org.apache.kafka.copycat.util.Callback;
|
import org.apache.kafka.copycat.util.Callback;
|
||||||
import org.apache.kafka.copycat.util.KafkaUtils;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -35,7 +34,7 @@ import java.io.File;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
@RunWith(PowerMockRunner.class)
|
@RunWith(PowerMockRunner.class)
|
||||||
@PrepareForTest({StandaloneCoordinator.class, KafkaUtils.class})
|
@PrepareForTest({StandaloneCoordinator.class})
|
||||||
@PowerMockIgnore("javax.management.*")
|
@PowerMockIgnore("javax.management.*")
|
||||||
public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestBase {
|
public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestBase {
|
||||||
private File coordinatorConfigFile;
|
private File coordinatorConfigFile;
|
||||||
|
@ -57,7 +56,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
|
||||||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||||
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
|
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
|
||||||
PowerMock.mockStatic(StandaloneCoordinator.class);
|
PowerMock.mockStatic(StandaloneCoordinator.class);
|
||||||
PowerMock.mockStatic(KafkaUtils.class);
|
|
||||||
|
|
||||||
// These can be anything since connectors can pass along whatever they want.
|
// These can be anything since connectors can pass along whatever they want.
|
||||||
taskProps = new Properties();
|
taskProps = new Properties();
|
||||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.kafka.copycat.runtime.Worker;
|
||||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||||
import org.apache.kafka.copycat.util.Callback;
|
import org.apache.kafka.copycat.util.Callback;
|
||||||
import org.apache.kafka.copycat.util.FutureCallback;
|
import org.apache.kafka.copycat.util.FutureCallback;
|
||||||
import org.apache.kafka.copycat.util.KafkaUtils;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
@ -35,7 +34,7 @@ import java.util.Properties;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@RunWith(PowerMockRunner.class)
|
@RunWith(PowerMockRunner.class)
|
||||||
@PrepareForTest({StandaloneCoordinator.class, KafkaUtils.class})
|
@PrepareForTest({StandaloneCoordinator.class})
|
||||||
@PowerMockIgnore("javax.management.*")
|
@PowerMockIgnore("javax.management.*")
|
||||||
public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
||||||
|
|
||||||
|
@ -49,7 +48,6 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
||||||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||||
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
|
connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
|
||||||
PowerMock.mockStatic(StandaloneCoordinator.class);
|
PowerMock.mockStatic(StandaloneCoordinator.class);
|
||||||
PowerMock.mockStatic(KafkaUtils.class);
|
|
||||||
|
|
||||||
// These can be anything since connectors can pass along whatever they want.
|
// These can be anything since connectors can pass along whatever they want.
|
||||||
taskProps = new Properties();
|
taskProps = new Properties();
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
package org.apache.kafka.copycat.runtime.standalone;
|
package org.apache.kafka.copycat.runtime.standalone;
|
||||||
|
|
||||||
import org.I0Itec.zkclient.ZkClient;
|
|
||||||
import org.apache.kafka.copycat.connector.Connector;
|
import org.apache.kafka.copycat.connector.Connector;
|
||||||
import org.apache.kafka.copycat.connector.Task;
|
import org.apache.kafka.copycat.connector.Task;
|
||||||
import org.apache.kafka.copycat.connector.TopicPartition;
|
import org.apache.kafka.copycat.connector.TopicPartition;
|
||||||
|
@ -30,22 +29,16 @@ import org.apache.kafka.copycat.source.SourceConnector;
|
||||||
import org.apache.kafka.copycat.source.SourceTask;
|
import org.apache.kafka.copycat.source.SourceTask;
|
||||||
import org.apache.kafka.copycat.util.Callback;
|
import org.apache.kafka.copycat.util.Callback;
|
||||||
import org.apache.kafka.copycat.util.ConnectorTaskId;
|
import org.apache.kafka.copycat.util.ConnectorTaskId;
|
||||||
import org.apache.kafka.copycat.util.KafkaUtils;
|
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.powermock.api.easymock.PowerMock;
|
import org.powermock.api.easymock.PowerMock;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
public class StandaloneCoordinatorTestBase {
|
public class StandaloneCoordinatorTestBase {
|
||||||
|
|
||||||
protected static final String CONNECTOR_NAME = "test";
|
protected static final String CONNECTOR_NAME = "test";
|
||||||
protected static final String TOPICS_LIST_STR = "topic1,topic2";
|
protected static final String TOPICS_LIST_STR = "topic1,topic2";
|
||||||
protected static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
|
|
||||||
protected static final List<TopicPartition> TOPIC_PARTITIONS = Arrays.asList(
|
|
||||||
new TopicPartition("topic1", 1), new TopicPartition("topic2", 1));
|
|
||||||
protected static final String TOPIC_PARTITIONS_STR = "topic1-1,topic2-1";
|
|
||||||
|
|
||||||
protected StandaloneCoordinator coordinator;
|
protected StandaloneCoordinator coordinator;
|
||||||
protected Worker worker;
|
protected Worker worker;
|
||||||
|
@ -79,8 +72,6 @@ public class StandaloneCoordinatorTestBase {
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
}
|
}
|
||||||
|
|
||||||
ZkClient zkClient = PowerMock.createMock(ZkClient.class);
|
|
||||||
EasyMock.expect(worker.getZkClient()).andStubReturn(zkClient);
|
|
||||||
connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
|
connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
connector.start(new Properties());
|
connector.start(new Properties());
|
||||||
|
@ -89,20 +80,14 @@ public class StandaloneCoordinatorTestBase {
|
||||||
// Just return the connector properties for the individual task we generate by default
|
// Just return the connector properties for the individual task we generate by default
|
||||||
EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass);
|
EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass);
|
||||||
|
|
||||||
if (sink) {
|
|
||||||
EasyMock.expect(KafkaUtils.getTopicPartitions(zkClient, TOPICS_LIST))
|
|
||||||
.andReturn(TOPIC_PARTITIONS);
|
|
||||||
}
|
|
||||||
|
|
||||||
EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
|
EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
|
||||||
.andReturn(Arrays.asList(taskProps));
|
.andReturn(Arrays.asList(taskProps));
|
||||||
// And we should instantiate the tasks. For a sink task, we should see added properties for
|
// And we should instantiate the tasks. For a sink task, we should see added properties for
|
||||||
// the input topic partitions
|
// the input topic partitions
|
||||||
Properties generatedTaskProps = new Properties();
|
Properties generatedTaskProps = new Properties();
|
||||||
generatedTaskProps.putAll(taskProps);
|
generatedTaskProps.putAll(taskProps);
|
||||||
if (sink) {
|
if (sink)
|
||||||
generatedTaskProps.setProperty(SinkTask.TOPICPARTITIONS_CONFIG, TOPIC_PARTITIONS_STR);
|
generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
|
||||||
}
|
|
||||||
worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps);
|
worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps);
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue