Make Converter generic to match serializers since some serialization formats do not require a base class of Object; update many other classes to have generic key and value class type parameters to match this change.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-01 23:23:50 -06:00
parent b194c7348f
commit 6787a85e87
22 changed files with 173 additions and 175 deletions

View File

@ -24,7 +24,7 @@ import java.util.List;
/**
* SourceTask is a Task that pulls records from another system for storage in Kafka.
*/
public abstract class SourceTask<K, V> implements Task {
public abstract class SourceTask implements Task {
protected SourceTaskContext context;

View File

@ -24,19 +24,19 @@ package org.apache.kafka.copycat.storage;
* component -- the producer or consumer serializer or deserializer for records or a Copycat
* serializer or deserializer for offsets.
*/
public interface Converter {
public interface Converter<T> {
/**
* Convert a Copycat data object to a native object for serialization.
* @param value
* @return
*/
Object fromCopycatData(Object value);
T fromCopycatData(Object value);
/**
* Convert a native object to a Copycat data object.
* @param value
* @return
*/
Object toCopycatData(Object value);
Object toCopycatData(T value);
}

View File

@ -32,7 +32,7 @@ import java.util.Properties;
/**
* FileStreamSourceTask reads from stdin or a file.
*/
public class FileStreamSourceTask extends SourceTask<Object, Object> {
public class FileStreamSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
private InputStream stream;

View File

@ -32,7 +32,7 @@ import java.util.*;
/**
* Implementation of Converter that uses JSON to store schemas and objects.
*/
public class JsonConverter implements Converter {
public class JsonConverter implements Converter<JsonNode> {
private static final HashMap<String, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS
= new HashMap<>();
@ -106,15 +106,11 @@ public class JsonConverter implements Converter {
}
@Override
public Object toCopycatData(Object value) {
if (!(value instanceof JsonNode))
throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects.");
public Object toCopycatData(JsonNode value) {
if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
throw new CopycatRuntimeException("JSON value converted to Copycat must be in envelope containing schema");
JsonNode data = (JsonNode) value;
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema");
return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
return convertToCopycat(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}

View File

@ -71,7 +71,7 @@ public class Copycat {
try {
// Destroy any requested connectors
for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) {
FutureCallback cb = new FutureCallback(new Callback<Void>() {
FutureCallback<Void> cb = new FutureCallback<>(new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
if (error != null)
@ -85,7 +85,7 @@ public class Copycat {
// Create any new connectors
for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
connectorProps = Utils.loadProps(connectorPropsFile);
FutureCallback cb = new FutureCallback(new Callback<String>() {
FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() {
@Override
public void onCompletion(Throwable error, String id) {
if (error != null)

View File

@ -128,27 +128,20 @@ public class WorkerConfig extends AbstractConfig {
Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC);
}
private Properties originalProperties;
public WorkerConfig() {
this(new Properties());
}
public WorkerConfig(Properties props) {
super(config, props);
this.originalProperties = props;
}
public Properties getUnusedProperties() {
Set<String> unusedKeys = this.unused();
Properties unusedProps = new Properties();
for (String key : unusedKeys) {
unusedProps.setProperty(key, originalProperties.getProperty(key));
unusedProps.put(key, this.originals().get(key));
}
return unusedProps;
}
public Properties getOriginalProperties() {
return originalProperties;
}
}

View File

@ -48,20 +48,20 @@ import java.util.Properties;
* Since each task has a dedicated thread, this is mainly just a container for them.
* </p>
*/
public class Worker {
public class Worker<K, V> {
private static final Logger log = LoggerFactory.getLogger(Worker.class);
private Time time;
private WorkerConfig config;
private Converter keyConverter;
private Converter valueConverter;
private Converter<K> keyConverter;
private Converter<V> valueConverter;
private OffsetBackingStore offsetBackingStore;
private Serializer offsetKeySerializer;
private Serializer offsetValueSerializer;
private Deserializer offsetKeyDeserializer;
private Deserializer offsetValueDeserializer;
private Serializer<K> offsetKeySerializer;
private Serializer<V> offsetValueSerializer;
private Deserializer<K> offsetKeyDeserializer;
private Deserializer<V> offsetValueDeserializer;
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
private KafkaProducer producer;
private KafkaProducer<K, V> producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
public Worker(WorkerConfig config) {
@ -70,6 +70,7 @@ public class Worker {
null, null, null, null);
}
@SuppressWarnings("unchecked")
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
Serializer offsetKeySerializer, Serializer offsetValueSerializer,
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
@ -83,28 +84,28 @@ public class Worker {
this.offsetKeySerializer = offsetKeySerializer;
} else {
this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.offsetKeySerializer.configure(config.getOriginalProperties(), true);
this.offsetKeySerializer.configure(config.originals(), true);
}
if (offsetValueSerializer != null) {
this.offsetValueSerializer = offsetValueSerializer;
} else {
this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.offsetValueSerializer.configure(config.getOriginalProperties(), false);
this.offsetValueSerializer.configure(config.originals(), false);
}
if (offsetKeyDeserializer != null) {
this.offsetKeyDeserializer = offsetKeyDeserializer;
} else {
this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true);
this.offsetKeyDeserializer.configure(config.originals(), true);
}
if (offsetValueDeserializer != null) {
this.offsetValueDeserializer = offsetValueDeserializer;
} else {
this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
this.offsetValueDeserializer.configure(config.originals(), false);
}
}
@ -185,14 +186,14 @@ public class Worker {
final WorkerTask workerTask;
if (task instanceof SourceTask) {
SourceTask sourceTask = (SourceTask) task;
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.getConnector(),
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.getConnector(),
keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.getConnector(),
OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.getConnector(),
keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, time);
} else if (task instanceof SinkTask) {
workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, keyConverter, valueConverter, time);
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");

View File

@ -38,21 +38,21 @@ import java.util.concurrent.TimeUnit;
/**
* WorkerTask that uses a SinkTask to export data from Kafka.
*/
public class WorkerSinkTask implements WorkerTask {
public class WorkerSinkTask<K, V> implements WorkerTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
private final ConnectorTaskId id;
private final SinkTask task;
private final WorkerConfig workerConfig;
private final Time time;
private final Converter keyConverter;
private final Converter valueConverter;
private final Converter<K> keyConverter;
private final Converter<V> valueConverter;
private WorkerSinkTaskThread workThread;
private KafkaConsumer<Object, Object> consumer;
private KafkaConsumer<K, V> consumer;
private final SinkTaskContext context;
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
Converter keyConverter, Converter valueConverter, Time time) {
Converter<K> keyConverter, Converter<V> valueConverter, Time time) {
this.id = id;
this.task = task;
this.workerConfig = workerConfig;
@ -107,7 +107,7 @@ public class WorkerSinkTask implements WorkerTask {
public void poll(long timeoutMs) {
try {
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs);
ConsumerRecords<K, V> msgs = consumer.poll(timeoutMs);
log.trace("{} polling returned {} messages", id, msgs.count());
deliverMessages(msgs);
} catch (ConsumerWakeupException we) {
@ -153,7 +153,7 @@ public class WorkerSinkTask implements WorkerTask {
return workerConfig;
}
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
@ -172,7 +172,7 @@ public class WorkerSinkTask implements WorkerTask {
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName());
KafkaConsumer<Object, Object> newConsumer;
KafkaConsumer<K, V> newConsumer;
try {
newConsumer = new KafkaConsumer<>(props);
} catch (Throwable t) {
@ -201,11 +201,11 @@ public class WorkerSinkTask implements WorkerTask {
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
}
private void deliverMessages(ConsumerRecords<Object, Object> msgs) {
private void deliverMessages(ConsumerRecords<K, V> msgs) {
// Finally, deliver this batch to the sink
if (msgs.count() > 0) {
List<SinkRecord> records = new ArrayList<>();
for (ConsumerRecord<Object, Object> msg : msgs) {
for (ConsumerRecord<K, V> msg : msgs) {
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
records.add(
new SinkRecord(msg.topic(), msg.partition(),

View File

@ -46,33 +46,33 @@ import java.util.concurrent.TimeoutException;
/**
* WorkerTask that uses a SourceTask to ingest data into Kafka.
*/
public class WorkerSourceTask implements WorkerTask {
public class WorkerSourceTask<K, V> implements WorkerTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
private ConnectorTaskId id;
private SourceTask task;
private final Converter keyConverter;
private final Converter valueConverter;
private KafkaProducer<Object, Object> producer;
private final Converter<K> keyConverter;
private final Converter<V> valueConverter;
private KafkaProducer<K, V> producer;
private WorkerSourceTaskThread workThread;
private OffsetStorageReader offsetReader;
private OffsetStorageWriter offsetWriter;
private OffsetStorageWriter<K, V> offsetWriter;
private final WorkerConfig workerConfig;
private final Time time;
// Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
// there is no IdentityHashSet.
private IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>
private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
outstandingMessages;
// A second buffer is used while an offset flush is running
private IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>
private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
outstandingMessagesBacklog;
private boolean flushing;
public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
Converter keyConverter, Converter valueConverter,
KafkaProducer<Object, Object> producer,
OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter,
Converter<K> keyConverter, Converter<V> valueConverter,
KafkaProducer<K, V> producer,
OffsetStorageReader offsetReader, OffsetStorageWriter<K, V> offsetWriter,
WorkerConfig workerConfig, Time time) {
this.id = id;
this.task = task;
@ -132,7 +132,7 @@ public class WorkerSourceTask implements WorkerTask {
*/
private synchronized void sendRecords(List<SourceRecord> records) {
for (SourceRecord record : records) {
final ProducerRecord<Object, Object> producerRecord
final ProducerRecord<K, V> producerRecord
= new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
keyConverter.fromCopycatData(record.getKey()),
valueConverter.fromCopycatData(record.getValue()));
@ -162,8 +162,8 @@ public class WorkerSourceTask implements WorkerTask {
}
}
private synchronized void recordSent(final ProducerRecord<Object, Object> record) {
ProducerRecord<Object, Object> removed = outstandingMessages.remove(record);
private synchronized void recordSent(final ProducerRecord<K, V> record) {
ProducerRecord<K, V> removed = outstandingMessages.remove(record);
// While flushing, we may also see callbacks for items in the backlog
if (removed == null && flushing)
removed = outstandingMessagesBacklog.remove(record);
@ -275,8 +275,7 @@ public class WorkerSourceTask implements WorkerTask {
private void finishSuccessfulFlush() {
// If we were successful, we can just swap instead of replacing items back into the original map
IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>> temp
= outstandingMessages;
IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> temp = outstandingMessages;
outstandingMessages = outstandingMessagesBacklog;
outstandingMessagesBacklog = temp;
flushing = false;

View File

@ -94,10 +94,14 @@ public class FileConfigStorage implements ConfigStorage {
}
}
@SuppressWarnings("unchecked")
private void load() {
try {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(filename));
connectorConfig = (Map<String, Properties>) is.readObject();
Object data = is.readObject();
if (!(data instanceof Map))
throw new CopycatRuntimeException("Expected Map but found " + data.getClass());
connectorConfig = (Map<String, Properties>) data;
} catch (FileNotFoundException e) {
// Expected on first run
} catch (IOException | ClassNotFoundException e) {

View File

@ -62,11 +62,14 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
log.info("Stopped FileOffsetBackingStore");
}
@SuppressWarnings("unchecked")
private void load() {
try {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
HashMap<String, Map<byte[], byte[]>> raw
= (HashMap<String, Map<byte[], byte[]>>) is.readObject();
Object obj = is.readObject();
if (!(obj instanceof HashMap))
throw new CopycatRuntimeException("Expected HashMap but found " + obj.getClass());
HashMap<String, Map<byte[], byte[]>> raw = (HashMap<String, Map<byte[], byte[]>>) obj;
data = new HashMap<>();
for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();

View File

@ -34,20 +34,19 @@ import java.util.Map;
* directly, the interface is only separate from this implementation because it needs to be
* included in the public API package.
*/
public class OffsetStorageReaderImpl implements OffsetStorageReader {
public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
private final OffsetBackingStore backingStore;
private final String namespace;
private final Converter keyConverter;
private final Converter valueConverter;
private final Serializer keySerializer;
private final Deserializer valueDeserializer;
private final Converter<K> keyConverter;
private final Converter<V> valueConverter;
private final Serializer<K> keySerializer;
private final Deserializer<V> valueDeserializer;
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
Converter keyConverter, Converter valueConverter,
Serializer keySerializer,
Deserializer valueDeserializer) {
Converter<K> keyConverter, Converter<V> valueConverter,
Serializer<K> keySerializer, Deserializer<V> valueDeserializer) {
this.backingStore = backingStore;
this.namespace = namespace;
this.keyConverter = keyConverter;

View File

@ -63,15 +63,16 @@ import java.util.concurrent.Future;
* This class is not thread-safe. It should only be accessed from a Task's processing thread.
* </p>
*/
public class OffsetStorageWriter {
public class OffsetStorageWriter<K, V> {
private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
private final OffsetBackingStore backingStore;
private final Converter keyConverter;
private final Converter valueConverter;
private final Serializer keySerializer;
private final Serializer valueSerializer;
private final Converter<K> keyConverter;
private final Converter<V> valueConverter;
private final Serializer<K> keySerializer;
private final Serializer<V> valueSerializer;
private final String namespace;
// Offset data in Copycat format
private Map<Object, Object> data = new HashMap<>();
// Not synchronized, should only be accessed by flush thread
@ -80,8 +81,8 @@ public class OffsetStorageWriter {
private long currentFlushId = 0;
public OffsetStorageWriter(OffsetBackingStore backingStore,
String namespace, Converter keyConverter, Converter valueConverter,
Serializer keySerializer, Serializer valueSerializer) {
String namespace, Converter<K> keyConverter, Converter<V> valueConverter,
Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this.backingStore = backingStore;
this.namespace = namespace;
this.keyConverter = keyConverter;
@ -90,6 +91,11 @@ public class OffsetStorageWriter {
this.valueSerializer = valueSerializer;
}
/**
* Set an offset for a partition using Copycat data values
* @param partition the partition to store an offset for
* @param offset the offset
*/
public synchronized void setOffset(Object partition, Object offset) {
data.put(partition, offset);
}

View File

@ -21,12 +21,12 @@ import java.util.concurrent.*;
public class FutureCallback<T> implements Callback<T>, Future<T> {
private Callback underlying;
private Callback<T> underlying;
private CountDownLatch finishedLatch;
private T result = null;
private Throwable exception = null;
public FutureCallback(Callback underlying) {
public FutureCallback(Callback<T> underlying) {
this.underlying = underlying;
this.finishedLatch = new CountDownLatch(1);
}

View File

@ -32,6 +32,7 @@ import org.easymock.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@ -46,33 +47,36 @@ import static org.junit.Assert.assertEquals;
@PowerMockIgnore("javax.management.*")
public class WorkerSinkTaskTest extends ThreadedTest {
// These are fixed to keep this code simpler
// These are fixed to keep this code simpler. In this example we assume byte[] raw values
// with mix of integer/string in Copycat
private static final String TOPIC = "test";
private static final int PARTITION = 12;
private static final long FIRST_OFFSET = 45;
private static final String KEY = "KEY";
private static final int KEY = 12;
private static final String VALUE = "VALUE";
private static final String TOPIC_PARTITION_STR = "test-12";
private static final byte[] RAW_KEY = "key".getBytes();
private static final byte[] RAW_VALUE = "value".getBytes();
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private Time time;
private SinkTask sinkTask;
@Mock private SinkTask sinkTask;
private WorkerConfig workerConfig;
private Converter keyConverter;
private Converter valueConverter;
private WorkerSinkTask workerTask;
private KafkaConsumer<Object, Object> consumer;
@Mock private Converter<byte[]> keyConverter;
@Mock
private Converter<byte[]> valueConverter;
private WorkerSinkTask<Integer, String> workerTask;
@Mock private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskThread workerThread;
private long recordsReturned;
@SuppressWarnings("unchecked")
@Override
public void setup() {
super.setup();
time = new MockTime();
sinkTask = PowerMock.createMock(SinkTask.class);
Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
@ -81,8 +85,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerConfig = new WorkerConfig(workerProps);
keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
@ -129,19 +131,15 @@ public class WorkerSinkTaskTest extends ThreadedTest {
public void testDeliverConvertsData() throws Exception {
// Validate conversion is performed when data is delivered
Integer record = 12;
byte[] rawKey = "key".getBytes(), rawValue = "value".getBytes();
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
Collections.singletonMap(
new TopicPartition("topic", 0),
Collections.singletonList(
new ConsumerRecord<Object, Object>("topic", 0, 0, rawKey, rawValue))));
Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE))));
// Exact data doesn't matter, but should be passed directly to sink task
EasyMock.expect(keyConverter.toCopycatData(rawKey))
.andReturn(record);
EasyMock.expect(valueConverter.toCopycatData(rawValue))
.andReturn(record);
EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record);
EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record);
Capture<Collection<SinkRecord>> capturedRecords
= EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(capturedRecords));
@ -262,14 +260,13 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.verifyAll();
}
private KafkaConsumer<Object, Object> expectInitializeTask(Properties taskProps)
private KafkaConsumer<byte[], byte[]> expectInitializeTask(Properties taskProps)
throws Exception {
sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
PowerMock.expectLastCall();
sinkTask.start(taskProps);
PowerMock.expectLastCall();
consumer = PowerMock.createMock(KafkaConsumer.class);
PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
.andReturn(consumer);
workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"},
@ -302,22 +299,23 @@ public class WorkerSinkTaskTest extends ThreadedTest {
// Stub out all the consumer stream/iterator responses, which we just want to verify occur,
// but don't care about the exact details here.
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
new IAnswer<ConsumerRecords<Object, Object>>() {
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<Object, Object> answer() throws Throwable {
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
// "Sleep" so time will progress
time.sleep(pollDelayMs);
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), Arrays.asList(
new ConsumerRecord<Object, Object>(TOPIC, PARTITION,
FIRST_OFFSET + recordsReturned, KEY,
VALUE))));
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
Collections.singletonMap(
new TopicPartition(TOPIC, PARTITION),
Arrays.asList(
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
)));
recordsReturned++;
return records;
}
});
EasyMock.expect(keyConverter.toCopycatData(KEY)).andReturn(KEY).anyTimes();
EasyMock.expect(valueConverter.toCopycatData(VALUE)).andReturn(VALUE).anyTimes();
EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(KEY).anyTimes();
EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(VALUE).anyTimes();
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(capturedRecords));
EasyMock.expectLastCall().anyTimes();

View File

@ -37,9 +37,13 @@ import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -51,27 +55,29 @@ import java.util.concurrent.TimeoutException;
import static org.junit.Assert.*;
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest {
private static final byte[] PARTITION_BYTES = "partition".getBytes();
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
// Copycat-format data
private static final Integer KEY = -1;
private static final Integer RECORD = 12;
// The actual format of this data doesn't matter -- we just want to see that the right version
private static final Long RECORD = 12L;
// Native-formatted data. The actual format of this data doesn't matter -- we just want to see that the right version
// is used in the right place.
private static final String CONVERTED_KEY = "converted-key";
private static final ByteBuffer CONVERTED_KEY = ByteBuffer.wrap("converted-key".getBytes());
private static final String CONVERTED_RECORD = "converted-record";
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private WorkerConfig config;
private SourceTask sourceTask;
private Converter keyConverter;
private Converter valueConverter;
private KafkaProducer<Object, Object> producer;
private OffsetStorageReader offsetReader;
private OffsetStorageWriter offsetWriter;
private WorkerSourceTask workerTask;
private Future<RecordMetadata> sendFuture;
@Mock private SourceTask sourceTask;
@Mock private Converter<ByteBuffer> keyConverter;
@Mock private Converter<String> valueConverter;
@Mock private KafkaProducer<ByteBuffer, String> producer;
@Mock private OffsetStorageReader offsetReader;
@Mock private OffsetStorageWriter<ByteBuffer, String> offsetWriter;
private WorkerSourceTask<ByteBuffer, String> workerTask;
@Mock private Future<RecordMetadata> sendFuture;
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
@ -91,18 +97,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
config = new WorkerConfig(workerProps);
sourceTask = PowerMock.createMock(SourceTask.class);
keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
producer = PowerMock.createMock(KafkaProducer.class);
offsetReader = PowerMock.createMock(OffsetStorageReader.class);
offsetWriter = PowerMock.createMock(OffsetStorageWriter.class);
sendFuture = PowerMock.createMock(Future.class);
producerCallbacks = EasyMock.newCapture();
}
private void createWorkerTask() {
workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, new SystemTime());
}
@ -198,7 +197,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
// Can just use the same record for key and value
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD));
Capture<ProducerRecord> sent = expectSendRecord();
Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord();
PowerMock.replayAll();
@ -228,11 +227,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
return latch;
}
private Capture<ProducerRecord> expectSendRecord() throws InterruptedException {
private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() throws InterruptedException {
EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
Capture<ProducerRecord> sent = EasyMock.newCapture();
Capture<ProducerRecord<ByteBuffer, String>> sent = EasyMock.newCapture();
// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work
EasyMock.expect(
producer.send(EasyMock.capture(sent),
@ -260,11 +259,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
latch.await(1000, TimeUnit.MILLISECONDS);
}
@SuppressWarnings("unchecked")
private void expectOffsetFlush(boolean succeed) throws Exception {
EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
Future<Void> flushFuture = PowerMock.createMock(Future.class);
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class)))
.andReturn(flushFuture);
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
// Should throw for failure
IExpectationSetters<Void> futureGetExpect = EasyMock.expect(
flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)));

View File

@ -82,8 +82,7 @@ public class WorkerTest extends ThreadedTest {
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
PowerMock.mockStatic(Worker.class);
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName())
.andReturn(task);
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
@ -128,8 +127,7 @@ public class WorkerTest extends ThreadedTest {
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
PowerMock.mockStatic(Worker.class);
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName())
.andReturn(task);
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
@ -162,7 +160,7 @@ public class WorkerTest extends ThreadedTest {
}
private static class TestSourceTask extends SourceTask<Object, Object> {
private static class TestSourceTask extends SourceTask {
public TestSourceTask() {
}

View File

@ -18,9 +18,7 @@
package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.util.Callback;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -41,7 +39,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
@Before
public void setup() throws Exception {
worker = PowerMock.createMock(Worker.class);
Properties coordinatorProps = new Properties();
coordinatorProps.setProperty(StandaloneCoordinator.STORAGE_CONFIG,
FileConfigStorage.class.getName());
@ -50,7 +47,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
coordinatorProps.setProperty(FileConfigStorage.FILE_CONFIG,
coordinatorConfigFile.getAbsolutePath());
coordinator = new StandaloneCoordinator(worker, coordinatorProps);
createCallback = PowerMock.createMock(Callback.class);
connectorProps = new Properties();
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);

View File

@ -42,7 +42,6 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
public void setup() {
worker = PowerMock.createMock(Worker.class);
coordinator = new StandaloneCoordinator(worker, new Properties());
createCallback = PowerMock.createMock(Callback.class);
connectorProps = new Properties();
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
@ -85,9 +84,9 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
PowerMock.replayAll();
coordinator.addConnector(connectorProps, createCallback);
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback() {
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Object result) {
public void onCompletion(Throwable error, Void result) {
}
});

View File

@ -30,6 +30,7 @@ import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.ConnectorTaskId;
import org.easymock.EasyMock;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import java.util.Arrays;
import java.util.Properties;
@ -40,9 +41,9 @@ public class StandaloneCoordinatorTestBase {
protected static final String TOPICS_LIST_STR = "topic1,topic2";
protected StandaloneCoordinator coordinator;
protected Worker worker;
@Mock protected Worker worker;
protected Connector connector;
protected Callback<String> createCallback;
@Mock protected Callback<String> createCallback;
protected Properties connectorProps;
protected Properties taskProps;

View File

@ -71,8 +71,7 @@ public class FileOffsetBackingStoreTest {
store.set("namespace", firstSet, setCallback).get();
Map<ByteBuffer, ByteBuffer> values
= store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
Map<ByteBuffer, ByteBuffer> values = store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
assertEquals(buffer("value"), values.get(buffer("key")));
assertEquals(null, values.get(buffer("bad")));
@ -92,8 +91,7 @@ public class FileOffsetBackingStoreTest {
FileOffsetBackingStore restore = new FileOffsetBackingStore();
restore.configure(props);
restore.start();
Map<ByteBuffer, ByteBuffer> values
= restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get();
Map<ByteBuffer, ByteBuffer> values = restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get();
assertEquals(buffer("value"), values.get(buffer("key")));
PowerMock.verifyAll();
@ -104,15 +102,16 @@ public class FileOffsetBackingStoreTest {
}
private Callback<Void> expectSuccessfulSetCallback() {
@SuppressWarnings("unchecked")
Callback<Void> setCallback = PowerMock.createMock(Callback.class);
setCallback.onCompletion(null, null);
setCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.isNull(Void.class));
PowerMock.expectLastCall();
return setCallback;
}
@SuppressWarnings("unchecked")
private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() {
Callback<Map<ByteBuffer, ByteBuffer>> getCallback
= PowerMock.createMock(Callback.class);
Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class);
getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class));
PowerMock.expectLastCall();
return getCallback;

View File

@ -26,34 +26,43 @@ import org.easymock.IAnswer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
public class OffsetStorageWriterTest {
private static final String NAMESPACE = "namespace";
private static final String OFFSET_KEY = "key";
// Copycat format - any types should be accepted here
private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
private static final String OFFSET_VALUE = "value";
private static final String OFFSET_KEY_CONVERTED = "key-converted";
// Native objects - must match serializer types
private static final int OFFSET_KEY_CONVERTED = 12;
private static final String OFFSET_VALUE_CONVERTED = "value-converted";
// Serialized
private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED
= Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED),
ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
private OffsetBackingStore store;
private Converter keyConverter;
private Converter valueConverter;
private Serializer keySerializer;
private Serializer valueSerializer;
private OffsetStorageWriter writer;
@Mock private OffsetBackingStore store;
@Mock private Converter<Integer> keyConverter;
@Mock private Converter<String> valueConverter;
@Mock private Serializer<Integer> keySerializer;
@Mock private Serializer<String> valueSerializer;
private OffsetStorageWriter<Integer, String> writer;
private static Exception exception = new RuntimeException("error");
@ -61,13 +70,7 @@ public class OffsetStorageWriterTest {
@Before
public void setup() {
store = PowerMock.createMock(OffsetBackingStore.class);
keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
keySerializer = PowerMock.createMock(Serializer.class);
valueSerializer = PowerMock.createMock(Serializer.class);
writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
service = Executors.newFixedThreadPool(1);
}
@ -78,6 +81,7 @@ public class OffsetStorageWriterTest {
@Test
public void testWriteFlush() throws Exception {
@SuppressWarnings("unchecked")
Callback<Void> callback = PowerMock.createMock(Callback.class);
expectStore(callback, false);
@ -109,6 +113,7 @@ public class OffsetStorageWriterTest {
// When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored
// such that a subsequent flush will write them.
@SuppressWarnings("unchecked")
final Callback<Void> callback = PowerMock.createMock(Callback.class);
// First time the write fails
expectStore(callback, true);
@ -130,6 +135,7 @@ public class OffsetStorageWriterTest {
@Test(expected = CopycatRuntimeException.class)
public void testAlreadyFlushing() throws Exception {
@SuppressWarnings("unchecked")
final Callback<Void> callback = PowerMock.createMock(Callback.class);
// Trigger the send, but don't invoke the callback so we'll still be mid-flush
CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
@ -158,6 +164,7 @@ public class OffsetStorageWriterTest {
@Test
public void testCancelAfterAwaitFlush() throws Exception {
@SuppressWarnings("unchecked")
Callback<Void> callback = PowerMock.createMock(Callback.class);
CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
// In this test, the write should be cancelled so the callback will not be invoked and is not