Make Converter generic to match serializers since some serialization formats do not require a base class of Object; update many other classes to have generic key and value class type parameters to match this change.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-01 23:23:50 -06:00
parent b194c7348f
commit 6787a85e87
22 changed files with 173 additions and 175 deletions

View File

@ -24,7 +24,7 @@ import java.util.List;
/** /**
* SourceTask is a Task that pulls records from another system for storage in Kafka. * SourceTask is a Task that pulls records from another system for storage in Kafka.
*/ */
public abstract class SourceTask<K, V> implements Task { public abstract class SourceTask implements Task {
protected SourceTaskContext context; protected SourceTaskContext context;

View File

@ -24,19 +24,19 @@ package org.apache.kafka.copycat.storage;
* component -- the producer or consumer serializer or deserializer for records or a Copycat * component -- the producer or consumer serializer or deserializer for records or a Copycat
* serializer or deserializer for offsets. * serializer or deserializer for offsets.
*/ */
public interface Converter { public interface Converter<T> {
/** /**
* Convert a Copycat data object to a native object for serialization. * Convert a Copycat data object to a native object for serialization.
* @param value * @param value
* @return * @return
*/ */
Object fromCopycatData(Object value); T fromCopycatData(Object value);
/** /**
* Convert a native object to a Copycat data object. * Convert a native object to a Copycat data object.
* @param value * @param value
* @return * @return
*/ */
Object toCopycatData(Object value); Object toCopycatData(T value);
} }

View File

@ -32,7 +32,7 @@ import java.util.Properties;
/** /**
* FileStreamSourceTask reads from stdin or a file. * FileStreamSourceTask reads from stdin or a file.
*/ */
public class FileStreamSourceTask extends SourceTask<Object, Object> { public class FileStreamSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class); private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
private InputStream stream; private InputStream stream;

View File

@ -32,7 +32,7 @@ import java.util.*;
/** /**
* Implementation of Converter that uses JSON to store schemas and objects. * Implementation of Converter that uses JSON to store schemas and objects.
*/ */
public class JsonConverter implements Converter { public class JsonConverter implements Converter<JsonNode> {
private static final HashMap<String, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS private static final HashMap<String, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS
= new HashMap<>(); = new HashMap<>();
@ -106,15 +106,11 @@ public class JsonConverter implements Converter {
} }
@Override @Override
public Object toCopycatData(Object value) { public Object toCopycatData(JsonNode value) {
if (!(value instanceof JsonNode)) if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects."); throw new CopycatRuntimeException("JSON value converted to Copycat must be in envelope containing schema");
JsonNode data = (JsonNode) value; return convertToCopycat(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema");
return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
} }

View File

@ -71,7 +71,7 @@ public class Copycat {
try { try {
// Destroy any requested connectors // Destroy any requested connectors
for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) { for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) {
FutureCallback cb = new FutureCallback(new Callback<Void>() { FutureCallback<Void> cb = new FutureCallback<>(new Callback<Void>() {
@Override @Override
public void onCompletion(Throwable error, Void result) { public void onCompletion(Throwable error, Void result) {
if (error != null) if (error != null)
@ -85,7 +85,7 @@ public class Copycat {
// Create any new connectors // Create any new connectors
for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) { for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
connectorProps = Utils.loadProps(connectorPropsFile); connectorProps = Utils.loadProps(connectorPropsFile);
FutureCallback cb = new FutureCallback(new Callback<String>() { FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() {
@Override @Override
public void onCompletion(Throwable error, String id) { public void onCompletion(Throwable error, String id) {
if (error != null) if (error != null)

View File

@ -128,27 +128,20 @@ public class WorkerConfig extends AbstractConfig {
Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC); Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC);
} }
private Properties originalProperties;
public WorkerConfig() { public WorkerConfig() {
this(new Properties()); this(new Properties());
} }
public WorkerConfig(Properties props) { public WorkerConfig(Properties props) {
super(config, props); super(config, props);
this.originalProperties = props;
} }
public Properties getUnusedProperties() { public Properties getUnusedProperties() {
Set<String> unusedKeys = this.unused(); Set<String> unusedKeys = this.unused();
Properties unusedProps = new Properties(); Properties unusedProps = new Properties();
for (String key : unusedKeys) { for (String key : unusedKeys) {
unusedProps.setProperty(key, originalProperties.getProperty(key)); unusedProps.put(key, this.originals().get(key));
} }
return unusedProps; return unusedProps;
} }
public Properties getOriginalProperties() {
return originalProperties;
}
} }

View File

@ -48,20 +48,20 @@ import java.util.Properties;
* Since each task has a dedicated thread, this is mainly just a container for them. * Since each task has a dedicated thread, this is mainly just a container for them.
* </p> * </p>
*/ */
public class Worker { public class Worker<K, V> {
private static final Logger log = LoggerFactory.getLogger(Worker.class); private static final Logger log = LoggerFactory.getLogger(Worker.class);
private Time time; private Time time;
private WorkerConfig config; private WorkerConfig config;
private Converter keyConverter; private Converter<K> keyConverter;
private Converter valueConverter; private Converter<V> valueConverter;
private OffsetBackingStore offsetBackingStore; private OffsetBackingStore offsetBackingStore;
private Serializer offsetKeySerializer; private Serializer<K> offsetKeySerializer;
private Serializer offsetValueSerializer; private Serializer<V> offsetValueSerializer;
private Deserializer offsetKeyDeserializer; private Deserializer<K> offsetKeyDeserializer;
private Deserializer offsetValueDeserializer; private Deserializer<V> offsetValueDeserializer;
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>(); private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
private KafkaProducer producer; private KafkaProducer<K, V> producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
public Worker(WorkerConfig config) { public Worker(WorkerConfig config) {
@ -70,6 +70,7 @@ public class Worker {
null, null, null, null); null, null, null, null);
} }
@SuppressWarnings("unchecked")
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore, public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
Serializer offsetKeySerializer, Serializer offsetValueSerializer, Serializer offsetKeySerializer, Serializer offsetValueSerializer,
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) { Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
@ -83,28 +84,28 @@ public class Worker {
this.offsetKeySerializer = offsetKeySerializer; this.offsetKeySerializer = offsetKeySerializer;
} else { } else {
this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.offsetKeySerializer.configure(config.getOriginalProperties(), true); this.offsetKeySerializer.configure(config.originals(), true);
} }
if (offsetValueSerializer != null) { if (offsetValueSerializer != null) {
this.offsetValueSerializer = offsetValueSerializer; this.offsetValueSerializer = offsetValueSerializer;
} else { } else {
this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.offsetValueSerializer.configure(config.getOriginalProperties(), false); this.offsetValueSerializer.configure(config.originals(), false);
} }
if (offsetKeyDeserializer != null) { if (offsetKeyDeserializer != null) {
this.offsetKeyDeserializer = offsetKeyDeserializer; this.offsetKeyDeserializer = offsetKeyDeserializer;
} else { } else {
this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true); this.offsetKeyDeserializer.configure(config.originals(), true);
} }
if (offsetValueDeserializer != null) { if (offsetValueDeserializer != null) {
this.offsetValueDeserializer = offsetValueDeserializer; this.offsetValueDeserializer = offsetValueDeserializer;
} else { } else {
this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false); this.offsetValueDeserializer.configure(config.originals(), false);
} }
} }
@ -185,14 +186,14 @@ public class Worker {
final WorkerTask workerTask; final WorkerTask workerTask;
if (task instanceof SourceTask) { if (task instanceof SourceTask) {
SourceTask sourceTask = (SourceTask) task; SourceTask sourceTask = (SourceTask) task;
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.getConnector(), OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.getConnector(),
keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer); keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.getConnector(), OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.getConnector(),
keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer); keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer, workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, time); offsetReader, offsetWriter, config, time);
} else if (task instanceof SinkTask) { } else if (task instanceof SinkTask) {
workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time); workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, keyConverter, valueConverter, time);
} else { } else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask"); throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");

View File

@ -38,21 +38,21 @@ import java.util.concurrent.TimeUnit;
/** /**
* WorkerTask that uses a SinkTask to export data from Kafka. * WorkerTask that uses a SinkTask to export data from Kafka.
*/ */
public class WorkerSinkTask implements WorkerTask { public class WorkerSinkTask<K, V> implements WorkerTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
private final ConnectorTaskId id; private final ConnectorTaskId id;
private final SinkTask task; private final SinkTask task;
private final WorkerConfig workerConfig; private final WorkerConfig workerConfig;
private final Time time; private final Time time;
private final Converter keyConverter; private final Converter<K> keyConverter;
private final Converter valueConverter; private final Converter<V> valueConverter;
private WorkerSinkTaskThread workThread; private WorkerSinkTaskThread workThread;
private KafkaConsumer<Object, Object> consumer; private KafkaConsumer<K, V> consumer;
private final SinkTaskContext context; private final SinkTaskContext context;
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
Converter keyConverter, Converter valueConverter, Time time) { Converter<K> keyConverter, Converter<V> valueConverter, Time time) {
this.id = id; this.id = id;
this.task = task; this.task = task;
this.workerConfig = workerConfig; this.workerConfig = workerConfig;
@ -107,7 +107,7 @@ public class WorkerSinkTask implements WorkerTask {
public void poll(long timeoutMs) { public void poll(long timeoutMs) {
try { try {
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs); ConsumerRecords<K, V> msgs = consumer.poll(timeoutMs);
log.trace("{} polling returned {} messages", id, msgs.count()); log.trace("{} polling returned {} messages", id, msgs.count());
deliverMessages(msgs); deliverMessages(msgs);
} catch (ConsumerWakeupException we) { } catch (ConsumerWakeupException we) {
@ -153,7 +153,7 @@ public class WorkerSinkTask implements WorkerTask {
return workerConfig; return workerConfig;
} }
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) { private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG); String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty()) if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatRuntimeException("Sink tasks require a list of topics."); throw new CopycatRuntimeException("Sink tasks require a list of topics.");
@ -172,7 +172,7 @@ public class WorkerSinkTask implements WorkerTask {
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName()); workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName());
KafkaConsumer<Object, Object> newConsumer; KafkaConsumer<K, V> newConsumer;
try { try {
newConsumer = new KafkaConsumer<>(props); newConsumer = new KafkaConsumer<>(props);
} catch (Throwable t) { } catch (Throwable t) {
@ -201,11 +201,11 @@ public class WorkerSinkTask implements WorkerTask {
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig); return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
} }
private void deliverMessages(ConsumerRecords<Object, Object> msgs) { private void deliverMessages(ConsumerRecords<K, V> msgs) {
// Finally, deliver this batch to the sink // Finally, deliver this batch to the sink
if (msgs.count() > 0) { if (msgs.count() > 0) {
List<SinkRecord> records = new ArrayList<>(); List<SinkRecord> records = new ArrayList<>();
for (ConsumerRecord<Object, Object> msg : msgs) { for (ConsumerRecord<K, V> msg : msgs) {
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
records.add( records.add(
new SinkRecord(msg.topic(), msg.partition(), new SinkRecord(msg.topic(), msg.partition(),

View File

@ -46,33 +46,33 @@ import java.util.concurrent.TimeoutException;
/** /**
* WorkerTask that uses a SourceTask to ingest data into Kafka. * WorkerTask that uses a SourceTask to ingest data into Kafka.
*/ */
public class WorkerSourceTask implements WorkerTask { public class WorkerSourceTask<K, V> implements WorkerTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class); private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
private ConnectorTaskId id; private ConnectorTaskId id;
private SourceTask task; private SourceTask task;
private final Converter keyConverter; private final Converter<K> keyConverter;
private final Converter valueConverter; private final Converter<V> valueConverter;
private KafkaProducer<Object, Object> producer; private KafkaProducer<K, V> producer;
private WorkerSourceTaskThread workThread; private WorkerSourceTaskThread workThread;
private OffsetStorageReader offsetReader; private OffsetStorageReader offsetReader;
private OffsetStorageWriter offsetWriter; private OffsetStorageWriter<K, V> offsetWriter;
private final WorkerConfig workerConfig; private final WorkerConfig workerConfig;
private final Time time; private final Time time;
// Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
// there is no IdentityHashSet. // there is no IdentityHashSet.
private IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>> private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
outstandingMessages; outstandingMessages;
// A second buffer is used while an offset flush is running // A second buffer is used while an offset flush is running
private IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>> private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
outstandingMessagesBacklog; outstandingMessagesBacklog;
private boolean flushing; private boolean flushing;
public WorkerSourceTask(ConnectorTaskId id, SourceTask task, public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
Converter keyConverter, Converter valueConverter, Converter<K> keyConverter, Converter<V> valueConverter,
KafkaProducer<Object, Object> producer, KafkaProducer<K, V> producer,
OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, OffsetStorageReader offsetReader, OffsetStorageWriter<K, V> offsetWriter,
WorkerConfig workerConfig, Time time) { WorkerConfig workerConfig, Time time) {
this.id = id; this.id = id;
this.task = task; this.task = task;
@ -132,7 +132,7 @@ public class WorkerSourceTask implements WorkerTask {
*/ */
private synchronized void sendRecords(List<SourceRecord> records) { private synchronized void sendRecords(List<SourceRecord> records) {
for (SourceRecord record : records) { for (SourceRecord record : records) {
final ProducerRecord<Object, Object> producerRecord final ProducerRecord<K, V> producerRecord
= new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(), = new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
keyConverter.fromCopycatData(record.getKey()), keyConverter.fromCopycatData(record.getKey()),
valueConverter.fromCopycatData(record.getValue())); valueConverter.fromCopycatData(record.getValue()));
@ -162,8 +162,8 @@ public class WorkerSourceTask implements WorkerTask {
} }
} }
private synchronized void recordSent(final ProducerRecord<Object, Object> record) { private synchronized void recordSent(final ProducerRecord<K, V> record) {
ProducerRecord<Object, Object> removed = outstandingMessages.remove(record); ProducerRecord<K, V> removed = outstandingMessages.remove(record);
// While flushing, we may also see callbacks for items in the backlog // While flushing, we may also see callbacks for items in the backlog
if (removed == null && flushing) if (removed == null && flushing)
removed = outstandingMessagesBacklog.remove(record); removed = outstandingMessagesBacklog.remove(record);
@ -275,8 +275,7 @@ public class WorkerSourceTask implements WorkerTask {
private void finishSuccessfulFlush() { private void finishSuccessfulFlush() {
// If we were successful, we can just swap instead of replacing items back into the original map // If we were successful, we can just swap instead of replacing items back into the original map
IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>> temp IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> temp = outstandingMessages;
= outstandingMessages;
outstandingMessages = outstandingMessagesBacklog; outstandingMessages = outstandingMessagesBacklog;
outstandingMessagesBacklog = temp; outstandingMessagesBacklog = temp;
flushing = false; flushing = false;

View File

@ -94,10 +94,14 @@ public class FileConfigStorage implements ConfigStorage {
} }
} }
@SuppressWarnings("unchecked")
private void load() { private void load() {
try { try {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(filename)); ObjectInputStream is = new ObjectInputStream(new FileInputStream(filename));
connectorConfig = (Map<String, Properties>) is.readObject(); Object data = is.readObject();
if (!(data instanceof Map))
throw new CopycatRuntimeException("Expected Map but found " + data.getClass());
connectorConfig = (Map<String, Properties>) data;
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
// Expected on first run // Expected on first run
} catch (IOException | ClassNotFoundException e) { } catch (IOException | ClassNotFoundException e) {

View File

@ -62,11 +62,14 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
log.info("Stopped FileOffsetBackingStore"); log.info("Stopped FileOffsetBackingStore");
} }
@SuppressWarnings("unchecked")
private void load() { private void load() {
try { try {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file)); ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
HashMap<String, Map<byte[], byte[]>> raw Object obj = is.readObject();
= (HashMap<String, Map<byte[], byte[]>>) is.readObject(); if (!(obj instanceof HashMap))
throw new CopycatRuntimeException("Expected HashMap but found " + obj.getClass());
HashMap<String, Map<byte[], byte[]>> raw = (HashMap<String, Map<byte[], byte[]>>) obj;
data = new HashMap<>(); data = new HashMap<>();
for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) { for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>(); HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();

View File

@ -34,20 +34,19 @@ import java.util.Map;
* directly, the interface is only separate from this implementation because it needs to be * directly, the interface is only separate from this implementation because it needs to be
* included in the public API package. * included in the public API package.
*/ */
public class OffsetStorageReaderImpl implements OffsetStorageReader { public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class); private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
private final OffsetBackingStore backingStore; private final OffsetBackingStore backingStore;
private final String namespace; private final String namespace;
private final Converter keyConverter; private final Converter<K> keyConverter;
private final Converter valueConverter; private final Converter<V> valueConverter;
private final Serializer keySerializer; private final Serializer<K> keySerializer;
private final Deserializer valueDeserializer; private final Deserializer<V> valueDeserializer;
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace, public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
Converter keyConverter, Converter valueConverter, Converter<K> keyConverter, Converter<V> valueConverter,
Serializer keySerializer, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) {
Deserializer valueDeserializer) {
this.backingStore = backingStore; this.backingStore = backingStore;
this.namespace = namespace; this.namespace = namespace;
this.keyConverter = keyConverter; this.keyConverter = keyConverter;

View File

@ -63,15 +63,16 @@ import java.util.concurrent.Future;
* This class is not thread-safe. It should only be accessed from a Task's processing thread. * This class is not thread-safe. It should only be accessed from a Task's processing thread.
* </p> * </p>
*/ */
public class OffsetStorageWriter { public class OffsetStorageWriter<K, V> {
private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class); private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
private final OffsetBackingStore backingStore; private final OffsetBackingStore backingStore;
private final Converter keyConverter; private final Converter<K> keyConverter;
private final Converter valueConverter; private final Converter<V> valueConverter;
private final Serializer keySerializer; private final Serializer<K> keySerializer;
private final Serializer valueSerializer; private final Serializer<V> valueSerializer;
private final String namespace; private final String namespace;
// Offset data in Copycat format
private Map<Object, Object> data = new HashMap<>(); private Map<Object, Object> data = new HashMap<>();
// Not synchronized, should only be accessed by flush thread // Not synchronized, should only be accessed by flush thread
@ -80,8 +81,8 @@ public class OffsetStorageWriter {
private long currentFlushId = 0; private long currentFlushId = 0;
public OffsetStorageWriter(OffsetBackingStore backingStore, public OffsetStorageWriter(OffsetBackingStore backingStore,
String namespace, Converter keyConverter, Converter valueConverter, String namespace, Converter<K> keyConverter, Converter<V> valueConverter,
Serializer keySerializer, Serializer valueSerializer) { Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this.backingStore = backingStore; this.backingStore = backingStore;
this.namespace = namespace; this.namespace = namespace;
this.keyConverter = keyConverter; this.keyConverter = keyConverter;
@ -90,6 +91,11 @@ public class OffsetStorageWriter {
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
} }
/**
* Set an offset for a partition using Copycat data values
* @param partition the partition to store an offset for
* @param offset the offset
*/
public synchronized void setOffset(Object partition, Object offset) { public synchronized void setOffset(Object partition, Object offset) {
data.put(partition, offset); data.put(partition, offset);
} }

View File

@ -21,12 +21,12 @@ import java.util.concurrent.*;
public class FutureCallback<T> implements Callback<T>, Future<T> { public class FutureCallback<T> implements Callback<T>, Future<T> {
private Callback underlying; private Callback<T> underlying;
private CountDownLatch finishedLatch; private CountDownLatch finishedLatch;
private T result = null; private T result = null;
private Throwable exception = null; private Throwable exception = null;
public FutureCallback(Callback underlying) { public FutureCallback(Callback<T> underlying) {
this.underlying = underlying; this.underlying = underlying;
this.finishedLatch = new CountDownLatch(1); this.finishedLatch = new CountDownLatch(1);
} }

View File

@ -32,6 +32,7 @@ import org.easymock.*;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock; import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
@ -46,33 +47,36 @@ import static org.junit.Assert.assertEquals;
@PowerMockIgnore("javax.management.*") @PowerMockIgnore("javax.management.*")
public class WorkerSinkTaskTest extends ThreadedTest { public class WorkerSinkTaskTest extends ThreadedTest {
// These are fixed to keep this code simpler // These are fixed to keep this code simpler. In this example we assume byte[] raw values
// with mix of integer/string in Copycat
private static final String TOPIC = "test"; private static final String TOPIC = "test";
private static final int PARTITION = 12; private static final int PARTITION = 12;
private static final long FIRST_OFFSET = 45; private static final long FIRST_OFFSET = 45;
private static final String KEY = "KEY"; private static final int KEY = 12;
private static final String VALUE = "VALUE"; private static final String VALUE = "VALUE";
private static final String TOPIC_PARTITION_STR = "test-12"; private static final byte[] RAW_KEY = "key".getBytes();
private static final byte[] RAW_VALUE = "value".getBytes();
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION); private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private Time time; private Time time;
private SinkTask sinkTask; @Mock private SinkTask sinkTask;
private WorkerConfig workerConfig; private WorkerConfig workerConfig;
private Converter keyConverter; @Mock private Converter<byte[]> keyConverter;
private Converter valueConverter; @Mock
private WorkerSinkTask workerTask; private Converter<byte[]> valueConverter;
private KafkaConsumer<Object, Object> consumer; private WorkerSinkTask<Integer, String> workerTask;
@Mock private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskThread workerThread; private WorkerSinkTaskThread workerThread;
private long recordsReturned; private long recordsReturned;
@SuppressWarnings("unchecked")
@Override @Override
public void setup() { public void setup() {
super.setup(); super.setup();
time = new MockTime(); time = new MockTime();
sinkTask = PowerMock.createMock(SinkTask.class);
Properties workerProps = new Properties(); Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
@ -81,8 +85,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerConfig = new WorkerConfig(workerProps); workerConfig = new WorkerConfig(workerProps);
keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
workerTask = PowerMock.createPartialMock( workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
taskId, sinkTask, workerConfig, keyConverter, valueConverter, time); taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
@ -129,19 +131,15 @@ public class WorkerSinkTaskTest extends ThreadedTest {
public void testDeliverConvertsData() throws Exception { public void testDeliverConvertsData() throws Exception {
// Validate conversion is performed when data is delivered // Validate conversion is performed when data is delivered
Integer record = 12; Integer record = 12;
byte[] rawKey = "key".getBytes(), rawValue = "value".getBytes();
ConsumerRecords<Object, Object> records = new ConsumerRecords<>( ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
Collections.singletonMap( Collections.singletonMap(
new TopicPartition("topic", 0), new TopicPartition("topic", 0),
Collections.singletonList( Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE))));
new ConsumerRecord<Object, Object>("topic", 0, 0, rawKey, rawValue))));
// Exact data doesn't matter, but should be passed directly to sink task // Exact data doesn't matter, but should be passed directly to sink task
EasyMock.expect(keyConverter.toCopycatData(rawKey)) EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record);
.andReturn(record); EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record);
EasyMock.expect(valueConverter.toCopycatData(rawValue))
.andReturn(record);
Capture<Collection<SinkRecord>> capturedRecords Capture<Collection<SinkRecord>> capturedRecords
= EasyMock.newCapture(CaptureType.ALL); = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(capturedRecords)); sinkTask.put(EasyMock.capture(capturedRecords));
@ -262,14 +260,13 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.verifyAll(); PowerMock.verifyAll();
} }
private KafkaConsumer<Object, Object> expectInitializeTask(Properties taskProps) private KafkaConsumer<byte[], byte[]> expectInitializeTask(Properties taskProps)
throws Exception { throws Exception {
sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class)); sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
PowerMock.expectLastCall(); PowerMock.expectLastCall();
sinkTask.start(taskProps); sinkTask.start(taskProps);
PowerMock.expectLastCall(); PowerMock.expectLastCall();
consumer = PowerMock.createMock(KafkaConsumer.class);
PowerMock.expectPrivate(workerTask, "createConsumer", taskProps) PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
.andReturn(consumer); .andReturn(consumer);
workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"}, workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"},
@ -302,22 +299,23 @@ public class WorkerSinkTaskTest extends ThreadedTest {
// Stub out all the consumer stream/iterator responses, which we just want to verify occur, // Stub out all the consumer stream/iterator responses, which we just want to verify occur,
// but don't care about the exact details here. // but don't care about the exact details here.
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer( EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
new IAnswer<ConsumerRecords<Object, Object>>() { new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override @Override
public ConsumerRecords<Object, Object> answer() throws Throwable { public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
// "Sleep" so time will progress // "Sleep" so time will progress
time.sleep(pollDelayMs); time.sleep(pollDelayMs);
ConsumerRecords<Object, Object> records = new ConsumerRecords<>( ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), Arrays.asList( Collections.singletonMap(
new ConsumerRecord<Object, Object>(TOPIC, PARTITION, new TopicPartition(TOPIC, PARTITION),
FIRST_OFFSET + recordsReturned, KEY, Arrays.asList(
VALUE)))); new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
)));
recordsReturned++; recordsReturned++;
return records; return records;
} }
}); });
EasyMock.expect(keyConverter.toCopycatData(KEY)).andReturn(KEY).anyTimes(); EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(KEY).anyTimes();
EasyMock.expect(valueConverter.toCopycatData(VALUE)).andReturn(VALUE).anyTimes(); EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(VALUE).anyTimes();
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL); Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(capturedRecords)); sinkTask.put(EasyMock.capture(capturedRecords));
EasyMock.expectLastCall().anyTimes(); EasyMock.expectLastCall().anyTimes();

View File

@ -37,9 +37,13 @@ import org.easymock.EasyMock;
import org.easymock.IAnswer; import org.easymock.IAnswer;
import org.easymock.IExpectationSetters; import org.easymock.IExpectationSetters;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock; import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox; import org.powermock.reflect.Whitebox;
import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -51,27 +55,29 @@ import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*; import static org.junit.Assert.*;
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest { public class WorkerSourceTaskTest extends ThreadedTest {
private static final byte[] PARTITION_BYTES = "partition".getBytes(); private static final byte[] PARTITION_BYTES = "partition".getBytes();
private static final byte[] OFFSET_BYTES = "offset-1".getBytes(); private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
// Copycat-format data
private static final Integer KEY = -1; private static final Integer KEY = -1;
private static final Integer RECORD = 12; private static final Long RECORD = 12L;
// The actual format of this data doesn't matter -- we just want to see that the right version // Native-formatted data. The actual format of this data doesn't matter -- we just want to see that the right version
// is used in the right place. // is used in the right place.
private static final String CONVERTED_KEY = "converted-key"; private static final ByteBuffer CONVERTED_KEY = ByteBuffer.wrap("converted-key".getBytes());
private static final String CONVERTED_RECORD = "converted-record"; private static final String CONVERTED_RECORD = "converted-record";
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private WorkerConfig config; private WorkerConfig config;
private SourceTask sourceTask; @Mock private SourceTask sourceTask;
private Converter keyConverter; @Mock private Converter<ByteBuffer> keyConverter;
private Converter valueConverter; @Mock private Converter<String> valueConverter;
private KafkaProducer<Object, Object> producer; @Mock private KafkaProducer<ByteBuffer, String> producer;
private OffsetStorageReader offsetReader; @Mock private OffsetStorageReader offsetReader;
private OffsetStorageWriter offsetWriter; @Mock private OffsetStorageWriter<ByteBuffer, String> offsetWriter;
private WorkerSourceTask workerTask; private WorkerSourceTask<ByteBuffer, String> workerTask;
private Future<RecordMetadata> sendFuture; @Mock private Future<RecordMetadata> sendFuture;
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks; private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
@ -91,18 +97,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
config = new WorkerConfig(workerProps); config = new WorkerConfig(workerProps);
sourceTask = PowerMock.createMock(SourceTask.class);
keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
producer = PowerMock.createMock(KafkaProducer.class);
offsetReader = PowerMock.createMock(OffsetStorageReader.class);
offsetWriter = PowerMock.createMock(OffsetStorageWriter.class);
sendFuture = PowerMock.createMock(Future.class);
producerCallbacks = EasyMock.newCapture(); producerCallbacks = EasyMock.newCapture();
} }
private void createWorkerTask() { private void createWorkerTask() {
workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer, workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, new SystemTime()); offsetReader, offsetWriter, config, new SystemTime());
} }
@ -198,7 +197,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
// Can just use the same record for key and value // Can just use the same record for key and value
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD)); records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD));
Capture<ProducerRecord> sent = expectSendRecord(); Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord();
PowerMock.replayAll(); PowerMock.replayAll();
@ -228,11 +227,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
return latch; return latch;
} }
private Capture<ProducerRecord> expectSendRecord() throws InterruptedException { private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() throws InterruptedException {
EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY); EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD); EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
Capture<ProducerRecord> sent = EasyMock.newCapture(); Capture<ProducerRecord<ByteBuffer, String>> sent = EasyMock.newCapture();
// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work
EasyMock.expect( EasyMock.expect(
producer.send(EasyMock.capture(sent), producer.send(EasyMock.capture(sent),
@ -260,11 +259,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
latch.await(1000, TimeUnit.MILLISECONDS); latch.await(1000, TimeUnit.MILLISECONDS);
} }
@SuppressWarnings("unchecked")
private void expectOffsetFlush(boolean succeed) throws Exception { private void expectOffsetFlush(boolean succeed) throws Exception {
EasyMock.expect(offsetWriter.beginFlush()).andReturn(true); EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
Future<Void> flushFuture = PowerMock.createMock(Future.class); Future<Void> flushFuture = PowerMock.createMock(Future.class);
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))) EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
.andReturn(flushFuture);
// Should throw for failure // Should throw for failure
IExpectationSetters<Void> futureGetExpect = EasyMock.expect( IExpectationSetters<Void> futureGetExpect = EasyMock.expect(
flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))); flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)));

View File

@ -82,8 +82,7 @@ public class WorkerTest extends ThreadedTest {
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
PowerMock.mockStatic(Worker.class); PowerMock.mockStatic(Worker.class);
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()) PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
.andReturn(task);
PowerMock.expectNew( PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task), WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
@ -128,8 +127,7 @@ public class WorkerTest extends ThreadedTest {
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class); WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
PowerMock.mockStatic(Worker.class); PowerMock.mockStatic(Worker.class);
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()) PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
.andReturn(task);
PowerMock.expectNew( PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task), WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
@ -162,7 +160,7 @@ public class WorkerTest extends ThreadedTest {
} }
private static class TestSourceTask extends SourceTask<Object, Object> { private static class TestSourceTask extends SourceTask {
public TestSourceTask() { public TestSourceTask() {
} }

View File

@ -18,9 +18,7 @@
package org.apache.kafka.copycat.runtime.standalone; package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.copycat.runtime.ConnectorConfig; import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector; import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.util.Callback;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -41,7 +39,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
worker = PowerMock.createMock(Worker.class);
Properties coordinatorProps = new Properties(); Properties coordinatorProps = new Properties();
coordinatorProps.setProperty(StandaloneCoordinator.STORAGE_CONFIG, coordinatorProps.setProperty(StandaloneCoordinator.STORAGE_CONFIG,
FileConfigStorage.class.getName()); FileConfigStorage.class.getName());
@ -50,7 +47,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
coordinatorProps.setProperty(FileConfigStorage.FILE_CONFIG, coordinatorProps.setProperty(FileConfigStorage.FILE_CONFIG,
coordinatorConfigFile.getAbsolutePath()); coordinatorConfigFile.getAbsolutePath());
coordinator = new StandaloneCoordinator(worker, coordinatorProps); coordinator = new StandaloneCoordinator(worker, coordinatorProps);
createCallback = PowerMock.createMock(Callback.class);
connectorProps = new Properties(); connectorProps = new Properties();
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);

View File

@ -42,7 +42,6 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
public void setup() { public void setup() {
worker = PowerMock.createMock(Worker.class); worker = PowerMock.createMock(Worker.class);
coordinator = new StandaloneCoordinator(worker, new Properties()); coordinator = new StandaloneCoordinator(worker, new Properties());
createCallback = PowerMock.createMock(Callback.class);
connectorProps = new Properties(); connectorProps = new Properties();
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME); connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
@ -85,9 +84,9 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
PowerMock.replayAll(); PowerMock.replayAll();
coordinator.addConnector(connectorProps, createCallback); coordinator.addConnector(connectorProps, createCallback);
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback() { FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
@Override @Override
public void onCompletion(Throwable error, Object result) { public void onCompletion(Throwable error, Void result) {
} }
}); });

View File

@ -30,6 +30,7 @@ import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.ConnectorTaskId; import org.apache.kafka.copycat.util.ConnectorTaskId;
import org.easymock.EasyMock; import org.easymock.EasyMock;
import org.powermock.api.easymock.PowerMock; import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import java.util.Arrays; import java.util.Arrays;
import java.util.Properties; import java.util.Properties;
@ -40,9 +41,9 @@ public class StandaloneCoordinatorTestBase {
protected static final String TOPICS_LIST_STR = "topic1,topic2"; protected static final String TOPICS_LIST_STR = "topic1,topic2";
protected StandaloneCoordinator coordinator; protected StandaloneCoordinator coordinator;
protected Worker worker; @Mock protected Worker worker;
protected Connector connector; protected Connector connector;
protected Callback<String> createCallback; @Mock protected Callback<String> createCallback;
protected Properties connectorProps; protected Properties connectorProps;
protected Properties taskProps; protected Properties taskProps;

View File

@ -71,8 +71,7 @@ public class FileOffsetBackingStoreTest {
store.set("namespace", firstSet, setCallback).get(); store.set("namespace", firstSet, setCallback).get();
Map<ByteBuffer, ByteBuffer> values Map<ByteBuffer, ByteBuffer> values = store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
= store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
assertEquals(buffer("value"), values.get(buffer("key"))); assertEquals(buffer("value"), values.get(buffer("key")));
assertEquals(null, values.get(buffer("bad"))); assertEquals(null, values.get(buffer("bad")));
@ -92,8 +91,7 @@ public class FileOffsetBackingStoreTest {
FileOffsetBackingStore restore = new FileOffsetBackingStore(); FileOffsetBackingStore restore = new FileOffsetBackingStore();
restore.configure(props); restore.configure(props);
restore.start(); restore.start();
Map<ByteBuffer, ByteBuffer> values Map<ByteBuffer, ByteBuffer> values = restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get();
= restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get();
assertEquals(buffer("value"), values.get(buffer("key"))); assertEquals(buffer("value"), values.get(buffer("key")));
PowerMock.verifyAll(); PowerMock.verifyAll();
@ -104,15 +102,16 @@ public class FileOffsetBackingStoreTest {
} }
private Callback<Void> expectSuccessfulSetCallback() { private Callback<Void> expectSuccessfulSetCallback() {
@SuppressWarnings("unchecked")
Callback<Void> setCallback = PowerMock.createMock(Callback.class); Callback<Void> setCallback = PowerMock.createMock(Callback.class);
setCallback.onCompletion(null, null); setCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.isNull(Void.class));
PowerMock.expectLastCall(); PowerMock.expectLastCall();
return setCallback; return setCallback;
} }
@SuppressWarnings("unchecked")
private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() { private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() {
Callback<Map<ByteBuffer, ByteBuffer>> getCallback Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class);
= PowerMock.createMock(Callback.class);
getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class)); getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class));
PowerMock.expectLastCall(); PowerMock.expectLastCall();
return getCallback; return getCallback;

View File

@ -26,34 +26,43 @@ import org.easymock.IAnswer;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock; import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
public class OffsetStorageWriterTest { public class OffsetStorageWriterTest {
private static final String NAMESPACE = "namespace"; private static final String NAMESPACE = "namespace";
private static final String OFFSET_KEY = "key"; // Copycat format - any types should be accepted here
private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
private static final String OFFSET_VALUE = "value"; private static final String OFFSET_VALUE = "value";
private static final String OFFSET_KEY_CONVERTED = "key-converted"; // Native objects - must match serializer types
private static final int OFFSET_KEY_CONVERTED = 12;
private static final String OFFSET_VALUE_CONVERTED = "value-converted"; private static final String OFFSET_VALUE_CONVERTED = "value-converted";
// Serialized
private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED
= Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED), = Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED),
ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED)); ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
private OffsetBackingStore store; @Mock private OffsetBackingStore store;
private Converter keyConverter; @Mock private Converter<Integer> keyConverter;
private Converter valueConverter; @Mock private Converter<String> valueConverter;
private Serializer keySerializer; @Mock private Serializer<Integer> keySerializer;
private Serializer valueSerializer; @Mock private Serializer<String> valueSerializer;
private OffsetStorageWriter writer; private OffsetStorageWriter<Integer, String> writer;
private static Exception exception = new RuntimeException("error"); private static Exception exception = new RuntimeException("error");
@ -61,13 +70,7 @@ public class OffsetStorageWriterTest {
@Before @Before
public void setup() { public void setup() {
store = PowerMock.createMock(OffsetBackingStore.class); writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
keySerializer = PowerMock.createMock(Serializer.class);
valueSerializer = PowerMock.createMock(Serializer.class);
writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
service = Executors.newFixedThreadPool(1); service = Executors.newFixedThreadPool(1);
} }
@ -78,6 +81,7 @@ public class OffsetStorageWriterTest {
@Test @Test
public void testWriteFlush() throws Exception { public void testWriteFlush() throws Exception {
@SuppressWarnings("unchecked")
Callback<Void> callback = PowerMock.createMock(Callback.class); Callback<Void> callback = PowerMock.createMock(Callback.class);
expectStore(callback, false); expectStore(callback, false);
@ -109,6 +113,7 @@ public class OffsetStorageWriterTest {
// When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored // When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored
// such that a subsequent flush will write them. // such that a subsequent flush will write them.
@SuppressWarnings("unchecked")
final Callback<Void> callback = PowerMock.createMock(Callback.class); final Callback<Void> callback = PowerMock.createMock(Callback.class);
// First time the write fails // First time the write fails
expectStore(callback, true); expectStore(callback, true);
@ -130,6 +135,7 @@ public class OffsetStorageWriterTest {
@Test(expected = CopycatRuntimeException.class) @Test(expected = CopycatRuntimeException.class)
public void testAlreadyFlushing() throws Exception { public void testAlreadyFlushing() throws Exception {
@SuppressWarnings("unchecked")
final Callback<Void> callback = PowerMock.createMock(Callback.class); final Callback<Void> callback = PowerMock.createMock(Callback.class);
// Trigger the send, but don't invoke the callback so we'll still be mid-flush // Trigger the send, but don't invoke the callback so we'll still be mid-flush
CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
@ -158,6 +164,7 @@ public class OffsetStorageWriterTest {
@Test @Test
public void testCancelAfterAwaitFlush() throws Exception { public void testCancelAfterAwaitFlush() throws Exception {
@SuppressWarnings("unchecked")
Callback<Void> callback = PowerMock.createMock(Callback.class); Callback<Void> callback = PowerMock.createMock(Callback.class);
CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
// In this test, the write should be cancelled so the callback will not be invoked and is not // In this test, the write should be cancelled so the callback will not be invoked and is not