mirror of https://github.com/apache/kafka.git
Make Converter generic to match serializers since some serialization formats do not require a base class of Object; update many other classes to have generic key and value class type parameters to match this change.
This commit is contained in:
parent
b194c7348f
commit
6787a85e87
|
@ -24,7 +24,7 @@ import java.util.List;
|
|||
/**
|
||||
* SourceTask is a Task that pulls records from another system for storage in Kafka.
|
||||
*/
|
||||
public abstract class SourceTask<K, V> implements Task {
|
||||
public abstract class SourceTask implements Task {
|
||||
|
||||
protected SourceTaskContext context;
|
||||
|
||||
|
|
|
@ -24,19 +24,19 @@ package org.apache.kafka.copycat.storage;
|
|||
* component -- the producer or consumer serializer or deserializer for records or a Copycat
|
||||
* serializer or deserializer for offsets.
|
||||
*/
|
||||
public interface Converter {
|
||||
public interface Converter<T> {
|
||||
|
||||
/**
|
||||
* Convert a Copycat data object to a native object for serialization.
|
||||
* @param value
|
||||
* @return
|
||||
*/
|
||||
Object fromCopycatData(Object value);
|
||||
T fromCopycatData(Object value);
|
||||
|
||||
/**
|
||||
* Convert a native object to a Copycat data object.
|
||||
* @param value
|
||||
* @return
|
||||
*/
|
||||
Object toCopycatData(Object value);
|
||||
Object toCopycatData(T value);
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import java.util.Properties;
|
|||
/**
|
||||
* FileStreamSourceTask reads from stdin or a file.
|
||||
*/
|
||||
public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
||||
public class FileStreamSourceTask extends SourceTask {
|
||||
private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
|
||||
|
||||
private InputStream stream;
|
||||
|
|
|
@ -32,7 +32,7 @@ import java.util.*;
|
|||
/**
|
||||
* Implementation of Converter that uses JSON to store schemas and objects.
|
||||
*/
|
||||
public class JsonConverter implements Converter {
|
||||
public class JsonConverter implements Converter<JsonNode> {
|
||||
|
||||
private static final HashMap<String, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS
|
||||
= new HashMap<>();
|
||||
|
@ -106,15 +106,11 @@ public class JsonConverter implements Converter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Object toCopycatData(Object value) {
|
||||
if (!(value instanceof JsonNode))
|
||||
throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects.");
|
||||
public Object toCopycatData(JsonNode value) {
|
||||
if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
|
||||
throw new CopycatRuntimeException("JSON value converted to Copycat must be in envelope containing schema");
|
||||
|
||||
JsonNode data = (JsonNode) value;
|
||||
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
|
||||
throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema");
|
||||
|
||||
return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
return convertToCopycat(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ public class Copycat {
|
|||
try {
|
||||
// Destroy any requested connectors
|
||||
for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) {
|
||||
FutureCallback cb = new FutureCallback(new Callback<Void>() {
|
||||
FutureCallback<Void> cb = new FutureCallback<>(new Callback<Void>() {
|
||||
@Override
|
||||
public void onCompletion(Throwable error, Void result) {
|
||||
if (error != null)
|
||||
|
@ -85,7 +85,7 @@ public class Copycat {
|
|||
// Create any new connectors
|
||||
for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
|
||||
connectorProps = Utils.loadProps(connectorPropsFile);
|
||||
FutureCallback cb = new FutureCallback(new Callback<String>() {
|
||||
FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() {
|
||||
@Override
|
||||
public void onCompletion(Throwable error, String id) {
|
||||
if (error != null)
|
||||
|
|
|
@ -128,27 +128,20 @@ public class WorkerConfig extends AbstractConfig {
|
|||
Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC);
|
||||
}
|
||||
|
||||
private Properties originalProperties;
|
||||
|
||||
public WorkerConfig() {
|
||||
this(new Properties());
|
||||
}
|
||||
|
||||
public WorkerConfig(Properties props) {
|
||||
super(config, props);
|
||||
this.originalProperties = props;
|
||||
}
|
||||
|
||||
public Properties getUnusedProperties() {
|
||||
Set<String> unusedKeys = this.unused();
|
||||
Properties unusedProps = new Properties();
|
||||
for (String key : unusedKeys) {
|
||||
unusedProps.setProperty(key, originalProperties.getProperty(key));
|
||||
unusedProps.put(key, this.originals().get(key));
|
||||
}
|
||||
return unusedProps;
|
||||
}
|
||||
|
||||
public Properties getOriginalProperties() {
|
||||
return originalProperties;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,20 +48,20 @@ import java.util.Properties;
|
|||
* Since each task has a dedicated thread, this is mainly just a container for them.
|
||||
* </p>
|
||||
*/
|
||||
public class Worker {
|
||||
public class Worker<K, V> {
|
||||
private static final Logger log = LoggerFactory.getLogger(Worker.class);
|
||||
|
||||
private Time time;
|
||||
private WorkerConfig config;
|
||||
private Converter keyConverter;
|
||||
private Converter valueConverter;
|
||||
private Converter<K> keyConverter;
|
||||
private Converter<V> valueConverter;
|
||||
private OffsetBackingStore offsetBackingStore;
|
||||
private Serializer offsetKeySerializer;
|
||||
private Serializer offsetValueSerializer;
|
||||
private Deserializer offsetKeyDeserializer;
|
||||
private Deserializer offsetValueDeserializer;
|
||||
private Serializer<K> offsetKeySerializer;
|
||||
private Serializer<V> offsetValueSerializer;
|
||||
private Deserializer<K> offsetKeyDeserializer;
|
||||
private Deserializer<V> offsetValueDeserializer;
|
||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
|
||||
private KafkaProducer producer;
|
||||
private KafkaProducer<K, V> producer;
|
||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||
|
||||
public Worker(WorkerConfig config) {
|
||||
|
@ -70,6 +70,7 @@ public class Worker {
|
|||
null, null, null, null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
|
||||
Serializer offsetKeySerializer, Serializer offsetValueSerializer,
|
||||
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
|
||||
|
@ -83,28 +84,28 @@ public class Worker {
|
|||
this.offsetKeySerializer = offsetKeySerializer;
|
||||
} else {
|
||||
this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
|
||||
this.offsetKeySerializer.configure(config.getOriginalProperties(), true);
|
||||
this.offsetKeySerializer.configure(config.originals(), true);
|
||||
}
|
||||
|
||||
if (offsetValueSerializer != null) {
|
||||
this.offsetValueSerializer = offsetValueSerializer;
|
||||
} else {
|
||||
this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
|
||||
this.offsetValueSerializer.configure(config.getOriginalProperties(), false);
|
||||
this.offsetValueSerializer.configure(config.originals(), false);
|
||||
}
|
||||
|
||||
if (offsetKeyDeserializer != null) {
|
||||
this.offsetKeyDeserializer = offsetKeyDeserializer;
|
||||
} else {
|
||||
this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
||||
this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true);
|
||||
this.offsetKeyDeserializer.configure(config.originals(), true);
|
||||
}
|
||||
|
||||
if (offsetValueDeserializer != null) {
|
||||
this.offsetValueDeserializer = offsetValueDeserializer;
|
||||
} else {
|
||||
this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
||||
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
|
||||
this.offsetValueDeserializer.configure(config.originals(), false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -185,14 +186,14 @@ public class Worker {
|
|||
final WorkerTask workerTask;
|
||||
if (task instanceof SourceTask) {
|
||||
SourceTask sourceTask = (SourceTask) task;
|
||||
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.getConnector(),
|
||||
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.getConnector(),
|
||||
keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
|
||||
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.getConnector(),
|
||||
OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.getConnector(),
|
||||
keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
|
||||
workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
|
||||
workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer,
|
||||
offsetReader, offsetWriter, config, time);
|
||||
} else if (task instanceof SinkTask) {
|
||||
workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
|
||||
workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, keyConverter, valueConverter, time);
|
||||
} else {
|
||||
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
|
||||
throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");
|
||||
|
|
|
@ -38,21 +38,21 @@ import java.util.concurrent.TimeUnit;
|
|||
/**
|
||||
* WorkerTask that uses a SinkTask to export data from Kafka.
|
||||
*/
|
||||
public class WorkerSinkTask implements WorkerTask {
|
||||
public class WorkerSinkTask<K, V> implements WorkerTask {
|
||||
private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
|
||||
|
||||
private final ConnectorTaskId id;
|
||||
private final SinkTask task;
|
||||
private final WorkerConfig workerConfig;
|
||||
private final Time time;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private final Converter<K> keyConverter;
|
||||
private final Converter<V> valueConverter;
|
||||
private WorkerSinkTaskThread workThread;
|
||||
private KafkaConsumer<Object, Object> consumer;
|
||||
private KafkaConsumer<K, V> consumer;
|
||||
private final SinkTaskContext context;
|
||||
|
||||
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
|
||||
Converter keyConverter, Converter valueConverter, Time time) {
|
||||
Converter<K> keyConverter, Converter<V> valueConverter, Time time) {
|
||||
this.id = id;
|
||||
this.task = task;
|
||||
this.workerConfig = workerConfig;
|
||||
|
@ -107,7 +107,7 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
public void poll(long timeoutMs) {
|
||||
try {
|
||||
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
|
||||
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs);
|
||||
ConsumerRecords<K, V> msgs = consumer.poll(timeoutMs);
|
||||
log.trace("{} polling returned {} messages", id, msgs.count());
|
||||
deliverMessages(msgs);
|
||||
} catch (ConsumerWakeupException we) {
|
||||
|
@ -153,7 +153,7 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
return workerConfig;
|
||||
}
|
||||
|
||||
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
|
||||
private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
|
||||
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
|
||||
if (topicsStr == null || topicsStr.isEmpty())
|
||||
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
|
||||
|
@ -172,7 +172,7 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
|
||||
workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName());
|
||||
|
||||
KafkaConsumer<Object, Object> newConsumer;
|
||||
KafkaConsumer<K, V> newConsumer;
|
||||
try {
|
||||
newConsumer = new KafkaConsumer<>(props);
|
||||
} catch (Throwable t) {
|
||||
|
@ -201,11 +201,11 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
|
||||
}
|
||||
|
||||
private void deliverMessages(ConsumerRecords<Object, Object> msgs) {
|
||||
private void deliverMessages(ConsumerRecords<K, V> msgs) {
|
||||
// Finally, deliver this batch to the sink
|
||||
if (msgs.count() > 0) {
|
||||
List<SinkRecord> records = new ArrayList<>();
|
||||
for (ConsumerRecord<Object, Object> msg : msgs) {
|
||||
for (ConsumerRecord<K, V> msg : msgs) {
|
||||
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
|
||||
records.add(
|
||||
new SinkRecord(msg.topic(), msg.partition(),
|
||||
|
|
|
@ -46,33 +46,33 @@ import java.util.concurrent.TimeoutException;
|
|||
/**
|
||||
* WorkerTask that uses a SourceTask to ingest data into Kafka.
|
||||
*/
|
||||
public class WorkerSourceTask implements WorkerTask {
|
||||
public class WorkerSourceTask<K, V> implements WorkerTask {
|
||||
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
|
||||
|
||||
private ConnectorTaskId id;
|
||||
private SourceTask task;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private KafkaProducer<Object, Object> producer;
|
||||
private final Converter<K> keyConverter;
|
||||
private final Converter<V> valueConverter;
|
||||
private KafkaProducer<K, V> producer;
|
||||
private WorkerSourceTaskThread workThread;
|
||||
private OffsetStorageReader offsetReader;
|
||||
private OffsetStorageWriter offsetWriter;
|
||||
private OffsetStorageWriter<K, V> offsetWriter;
|
||||
private final WorkerConfig workerConfig;
|
||||
private final Time time;
|
||||
|
||||
// Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
|
||||
// there is no IdentityHashSet.
|
||||
private IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>
|
||||
private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
|
||||
outstandingMessages;
|
||||
// A second buffer is used while an offset flush is running
|
||||
private IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>
|
||||
private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
|
||||
outstandingMessagesBacklog;
|
||||
private boolean flushing;
|
||||
|
||||
public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
|
||||
Converter keyConverter, Converter valueConverter,
|
||||
KafkaProducer<Object, Object> producer,
|
||||
OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter,
|
||||
Converter<K> keyConverter, Converter<V> valueConverter,
|
||||
KafkaProducer<K, V> producer,
|
||||
OffsetStorageReader offsetReader, OffsetStorageWriter<K, V> offsetWriter,
|
||||
WorkerConfig workerConfig, Time time) {
|
||||
this.id = id;
|
||||
this.task = task;
|
||||
|
@ -132,7 +132,7 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
*/
|
||||
private synchronized void sendRecords(List<SourceRecord> records) {
|
||||
for (SourceRecord record : records) {
|
||||
final ProducerRecord<Object, Object> producerRecord
|
||||
final ProducerRecord<K, V> producerRecord
|
||||
= new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
|
||||
keyConverter.fromCopycatData(record.getKey()),
|
||||
valueConverter.fromCopycatData(record.getValue()));
|
||||
|
@ -162,8 +162,8 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized void recordSent(final ProducerRecord<Object, Object> record) {
|
||||
ProducerRecord<Object, Object> removed = outstandingMessages.remove(record);
|
||||
private synchronized void recordSent(final ProducerRecord<K, V> record) {
|
||||
ProducerRecord<K, V> removed = outstandingMessages.remove(record);
|
||||
// While flushing, we may also see callbacks for items in the backlog
|
||||
if (removed == null && flushing)
|
||||
removed = outstandingMessagesBacklog.remove(record);
|
||||
|
@ -275,8 +275,7 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
|
||||
private void finishSuccessfulFlush() {
|
||||
// If we were successful, we can just swap instead of replacing items back into the original map
|
||||
IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>> temp
|
||||
= outstandingMessages;
|
||||
IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> temp = outstandingMessages;
|
||||
outstandingMessages = outstandingMessagesBacklog;
|
||||
outstandingMessagesBacklog = temp;
|
||||
flushing = false;
|
||||
|
|
|
@ -94,10 +94,14 @@ public class FileConfigStorage implements ConfigStorage {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void load() {
|
||||
try {
|
||||
ObjectInputStream is = new ObjectInputStream(new FileInputStream(filename));
|
||||
connectorConfig = (Map<String, Properties>) is.readObject();
|
||||
Object data = is.readObject();
|
||||
if (!(data instanceof Map))
|
||||
throw new CopycatRuntimeException("Expected Map but found " + data.getClass());
|
||||
connectorConfig = (Map<String, Properties>) data;
|
||||
} catch (FileNotFoundException e) {
|
||||
// Expected on first run
|
||||
} catch (IOException | ClassNotFoundException e) {
|
||||
|
|
|
@ -62,11 +62,14 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
|
|||
log.info("Stopped FileOffsetBackingStore");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void load() {
|
||||
try {
|
||||
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
|
||||
HashMap<String, Map<byte[], byte[]>> raw
|
||||
= (HashMap<String, Map<byte[], byte[]>>) is.readObject();
|
||||
Object obj = is.readObject();
|
||||
if (!(obj instanceof HashMap))
|
||||
throw new CopycatRuntimeException("Expected HashMap but found " + obj.getClass());
|
||||
HashMap<String, Map<byte[], byte[]>> raw = (HashMap<String, Map<byte[], byte[]>>) obj;
|
||||
data = new HashMap<>();
|
||||
for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
|
||||
HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();
|
||||
|
|
|
@ -34,20 +34,19 @@ import java.util.Map;
|
|||
* directly, the interface is only separate from this implementation because it needs to be
|
||||
* included in the public API package.
|
||||
*/
|
||||
public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
||||
public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
|
||||
private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
|
||||
|
||||
private final OffsetBackingStore backingStore;
|
||||
private final String namespace;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private final Serializer keySerializer;
|
||||
private final Deserializer valueDeserializer;
|
||||
private final Converter<K> keyConverter;
|
||||
private final Converter<V> valueConverter;
|
||||
private final Serializer<K> keySerializer;
|
||||
private final Deserializer<V> valueDeserializer;
|
||||
|
||||
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
|
||||
Converter keyConverter, Converter valueConverter,
|
||||
Serializer keySerializer,
|
||||
Deserializer valueDeserializer) {
|
||||
Converter<K> keyConverter, Converter<V> valueConverter,
|
||||
Serializer<K> keySerializer, Deserializer<V> valueDeserializer) {
|
||||
this.backingStore = backingStore;
|
||||
this.namespace = namespace;
|
||||
this.keyConverter = keyConverter;
|
||||
|
|
|
@ -63,15 +63,16 @@ import java.util.concurrent.Future;
|
|||
* This class is not thread-safe. It should only be accessed from a Task's processing thread.
|
||||
* </p>
|
||||
*/
|
||||
public class OffsetStorageWriter {
|
||||
public class OffsetStorageWriter<K, V> {
|
||||
private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
|
||||
|
||||
private final OffsetBackingStore backingStore;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private final Serializer keySerializer;
|
||||
private final Serializer valueSerializer;
|
||||
private final Converter<K> keyConverter;
|
||||
private final Converter<V> valueConverter;
|
||||
private final Serializer<K> keySerializer;
|
||||
private final Serializer<V> valueSerializer;
|
||||
private final String namespace;
|
||||
// Offset data in Copycat format
|
||||
private Map<Object, Object> data = new HashMap<>();
|
||||
|
||||
// Not synchronized, should only be accessed by flush thread
|
||||
|
@ -80,8 +81,8 @@ public class OffsetStorageWriter {
|
|||
private long currentFlushId = 0;
|
||||
|
||||
public OffsetStorageWriter(OffsetBackingStore backingStore,
|
||||
String namespace, Converter keyConverter, Converter valueConverter,
|
||||
Serializer keySerializer, Serializer valueSerializer) {
|
||||
String namespace, Converter<K> keyConverter, Converter<V> valueConverter,
|
||||
Serializer<K> keySerializer, Serializer<V> valueSerializer) {
|
||||
this.backingStore = backingStore;
|
||||
this.namespace = namespace;
|
||||
this.keyConverter = keyConverter;
|
||||
|
@ -90,6 +91,11 @@ public class OffsetStorageWriter {
|
|||
this.valueSerializer = valueSerializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set an offset for a partition using Copycat data values
|
||||
* @param partition the partition to store an offset for
|
||||
* @param offset the offset
|
||||
*/
|
||||
public synchronized void setOffset(Object partition, Object offset) {
|
||||
data.put(partition, offset);
|
||||
}
|
||||
|
|
|
@ -21,12 +21,12 @@ import java.util.concurrent.*;
|
|||
|
||||
public class FutureCallback<T> implements Callback<T>, Future<T> {
|
||||
|
||||
private Callback underlying;
|
||||
private Callback<T> underlying;
|
||||
private CountDownLatch finishedLatch;
|
||||
private T result = null;
|
||||
private Throwable exception = null;
|
||||
|
||||
public FutureCallback(Callback underlying) {
|
||||
public FutureCallback(Callback<T> underlying) {
|
||||
this.underlying = underlying;
|
||||
this.finishedLatch = new CountDownLatch(1);
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.easymock.*;
|
|||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
import org.powermock.api.easymock.annotation.Mock;
|
||||
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
@ -46,33 +47,36 @@ import static org.junit.Assert.assertEquals;
|
|||
@PowerMockIgnore("javax.management.*")
|
||||
public class WorkerSinkTaskTest extends ThreadedTest {
|
||||
|
||||
// These are fixed to keep this code simpler
|
||||
// These are fixed to keep this code simpler. In this example we assume byte[] raw values
|
||||
// with mix of integer/string in Copycat
|
||||
private static final String TOPIC = "test";
|
||||
private static final int PARTITION = 12;
|
||||
private static final long FIRST_OFFSET = 45;
|
||||
private static final String KEY = "KEY";
|
||||
private static final int KEY = 12;
|
||||
private static final String VALUE = "VALUE";
|
||||
private static final String TOPIC_PARTITION_STR = "test-12";
|
||||
private static final byte[] RAW_KEY = "key".getBytes();
|
||||
private static final byte[] RAW_VALUE = "value".getBytes();
|
||||
|
||||
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
|
||||
|
||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||
private Time time;
|
||||
private SinkTask sinkTask;
|
||||
@Mock private SinkTask sinkTask;
|
||||
private WorkerConfig workerConfig;
|
||||
private Converter keyConverter;
|
||||
private Converter valueConverter;
|
||||
private WorkerSinkTask workerTask;
|
||||
private KafkaConsumer<Object, Object> consumer;
|
||||
@Mock private Converter<byte[]> keyConverter;
|
||||
@Mock
|
||||
private Converter<byte[]> valueConverter;
|
||||
private WorkerSinkTask<Integer, String> workerTask;
|
||||
@Mock private KafkaConsumer<byte[], byte[]> consumer;
|
||||
private WorkerSinkTaskThread workerThread;
|
||||
|
||||
private long recordsReturned;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void setup() {
|
||||
super.setup();
|
||||
time = new MockTime();
|
||||
sinkTask = PowerMock.createMock(SinkTask.class);
|
||||
Properties workerProps = new Properties();
|
||||
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
|
@ -81,8 +85,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
workerConfig = new WorkerConfig(workerProps);
|
||||
keyConverter = PowerMock.createMock(Converter.class);
|
||||
valueConverter = PowerMock.createMock(Converter.class);
|
||||
workerTask = PowerMock.createPartialMock(
|
||||
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
|
||||
taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
|
||||
|
@ -129,19 +131,15 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
public void testDeliverConvertsData() throws Exception {
|
||||
// Validate conversion is performed when data is delivered
|
||||
Integer record = 12;
|
||||
byte[] rawKey = "key".getBytes(), rawValue = "value".getBytes();
|
||||
|
||||
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
|
||||
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
|
||||
Collections.singletonMap(
|
||||
new TopicPartition("topic", 0),
|
||||
Collections.singletonList(
|
||||
new ConsumerRecord<Object, Object>("topic", 0, 0, rawKey, rawValue))));
|
||||
Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE))));
|
||||
|
||||
// Exact data doesn't matter, but should be passed directly to sink task
|
||||
EasyMock.expect(keyConverter.toCopycatData(rawKey))
|
||||
.andReturn(record);
|
||||
EasyMock.expect(valueConverter.toCopycatData(rawValue))
|
||||
.andReturn(record);
|
||||
EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record);
|
||||
EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record);
|
||||
Capture<Collection<SinkRecord>> capturedRecords
|
||||
= EasyMock.newCapture(CaptureType.ALL);
|
||||
sinkTask.put(EasyMock.capture(capturedRecords));
|
||||
|
@ -262,14 +260,13 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
PowerMock.verifyAll();
|
||||
}
|
||||
|
||||
private KafkaConsumer<Object, Object> expectInitializeTask(Properties taskProps)
|
||||
private KafkaConsumer<byte[], byte[]> expectInitializeTask(Properties taskProps)
|
||||
throws Exception {
|
||||
sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
|
||||
PowerMock.expectLastCall();
|
||||
sinkTask.start(taskProps);
|
||||
PowerMock.expectLastCall();
|
||||
|
||||
consumer = PowerMock.createMock(KafkaConsumer.class);
|
||||
PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
|
||||
.andReturn(consumer);
|
||||
workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"},
|
||||
|
@ -302,22 +299,23 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
// Stub out all the consumer stream/iterator responses, which we just want to verify occur,
|
||||
// but don't care about the exact details here.
|
||||
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
|
||||
new IAnswer<ConsumerRecords<Object, Object>>() {
|
||||
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
|
||||
@Override
|
||||
public ConsumerRecords<Object, Object> answer() throws Throwable {
|
||||
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
|
||||
// "Sleep" so time will progress
|
||||
time.sleep(pollDelayMs);
|
||||
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
|
||||
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), Arrays.asList(
|
||||
new ConsumerRecord<Object, Object>(TOPIC, PARTITION,
|
||||
FIRST_OFFSET + recordsReturned, KEY,
|
||||
VALUE))));
|
||||
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
|
||||
Collections.singletonMap(
|
||||
new TopicPartition(TOPIC, PARTITION),
|
||||
Arrays.asList(
|
||||
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
|
||||
)));
|
||||
recordsReturned++;
|
||||
return records;
|
||||
}
|
||||
});
|
||||
EasyMock.expect(keyConverter.toCopycatData(KEY)).andReturn(KEY).anyTimes();
|
||||
EasyMock.expect(valueConverter.toCopycatData(VALUE)).andReturn(VALUE).anyTimes();
|
||||
EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(KEY).anyTimes();
|
||||
EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(VALUE).anyTimes();
|
||||
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
|
||||
sinkTask.put(EasyMock.capture(capturedRecords));
|
||||
EasyMock.expectLastCall().anyTimes();
|
||||
|
|
|
@ -37,9 +37,13 @@ import org.easymock.EasyMock;
|
|||
import org.easymock.IAnswer;
|
||||
import org.easymock.IExpectationSetters;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
import org.powermock.api.easymock.annotation.Mock;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
import org.powermock.reflect.Whitebox;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
@ -51,27 +55,29 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
public class WorkerSourceTaskTest extends ThreadedTest {
|
||||
private static final byte[] PARTITION_BYTES = "partition".getBytes();
|
||||
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
|
||||
|
||||
// Copycat-format data
|
||||
private static final Integer KEY = -1;
|
||||
private static final Integer RECORD = 12;
|
||||
// The actual format of this data doesn't matter -- we just want to see that the right version
|
||||
private static final Long RECORD = 12L;
|
||||
// Native-formatted data. The actual format of this data doesn't matter -- we just want to see that the right version
|
||||
// is used in the right place.
|
||||
private static final String CONVERTED_KEY = "converted-key";
|
||||
private static final ByteBuffer CONVERTED_KEY = ByteBuffer.wrap("converted-key".getBytes());
|
||||
private static final String CONVERTED_RECORD = "converted-record";
|
||||
|
||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||
private WorkerConfig config;
|
||||
private SourceTask sourceTask;
|
||||
private Converter keyConverter;
|
||||
private Converter valueConverter;
|
||||
private KafkaProducer<Object, Object> producer;
|
||||
private OffsetStorageReader offsetReader;
|
||||
private OffsetStorageWriter offsetWriter;
|
||||
private WorkerSourceTask workerTask;
|
||||
private Future<RecordMetadata> sendFuture;
|
||||
@Mock private SourceTask sourceTask;
|
||||
@Mock private Converter<ByteBuffer> keyConverter;
|
||||
@Mock private Converter<String> valueConverter;
|
||||
@Mock private KafkaProducer<ByteBuffer, String> producer;
|
||||
@Mock private OffsetStorageReader offsetReader;
|
||||
@Mock private OffsetStorageWriter<ByteBuffer, String> offsetWriter;
|
||||
private WorkerSourceTask<ByteBuffer, String> workerTask;
|
||||
@Mock private Future<RecordMetadata> sendFuture;
|
||||
|
||||
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
|
||||
|
||||
|
@ -91,18 +97,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
config = new WorkerConfig(workerProps);
|
||||
sourceTask = PowerMock.createMock(SourceTask.class);
|
||||
keyConverter = PowerMock.createMock(Converter.class);
|
||||
valueConverter = PowerMock.createMock(Converter.class);
|
||||
producer = PowerMock.createMock(KafkaProducer.class);
|
||||
offsetReader = PowerMock.createMock(OffsetStorageReader.class);
|
||||
offsetWriter = PowerMock.createMock(OffsetStorageWriter.class);
|
||||
sendFuture = PowerMock.createMock(Future.class);
|
||||
producerCallbacks = EasyMock.newCapture();
|
||||
}
|
||||
|
||||
private void createWorkerTask() {
|
||||
workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
|
||||
workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, valueConverter, producer,
|
||||
offsetReader, offsetWriter, config, new SystemTime());
|
||||
}
|
||||
|
||||
|
@ -198,7 +197,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
// Can just use the same record for key and value
|
||||
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD));
|
||||
|
||||
Capture<ProducerRecord> sent = expectSendRecord();
|
||||
Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
|
@ -228,11 +227,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
return latch;
|
||||
}
|
||||
|
||||
private Capture<ProducerRecord> expectSendRecord() throws InterruptedException {
|
||||
private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() throws InterruptedException {
|
||||
EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
|
||||
EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
|
||||
|
||||
Capture<ProducerRecord> sent = EasyMock.newCapture();
|
||||
Capture<ProducerRecord<ByteBuffer, String>> sent = EasyMock.newCapture();
|
||||
// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work
|
||||
EasyMock.expect(
|
||||
producer.send(EasyMock.capture(sent),
|
||||
|
@ -260,11 +259,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
latch.await(1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void expectOffsetFlush(boolean succeed) throws Exception {
|
||||
EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
|
||||
Future<Void> flushFuture = PowerMock.createMock(Future.class);
|
||||
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class)))
|
||||
.andReturn(flushFuture);
|
||||
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
|
||||
// Should throw for failure
|
||||
IExpectationSetters<Void> futureGetExpect = EasyMock.expect(
|
||||
flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)));
|
||||
|
|
|
@ -82,8 +82,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
|
||||
|
||||
PowerMock.mockStatic(Worker.class);
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName())
|
||||
.andReturn(task);
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
|
||||
|
||||
PowerMock.expectNew(
|
||||
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
|
||||
|
@ -128,8 +127,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
|
||||
|
||||
PowerMock.mockStatic(Worker.class);
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName())
|
||||
.andReturn(task);
|
||||
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
|
||||
|
||||
PowerMock.expectNew(
|
||||
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
|
||||
|
@ -162,7 +160,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
}
|
||||
|
||||
|
||||
private static class TestSourceTask extends SourceTask<Object, Object> {
|
||||
private static class TestSourceTask extends SourceTask {
|
||||
public TestSourceTask() {
|
||||
}
|
||||
|
||||
|
|
|
@ -18,9 +18,7 @@
|
|||
package org.apache.kafka.copycat.runtime.standalone;
|
||||
|
||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||
import org.apache.kafka.copycat.runtime.Worker;
|
||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -41,7 +39,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
|
|||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
worker = PowerMock.createMock(Worker.class);
|
||||
Properties coordinatorProps = new Properties();
|
||||
coordinatorProps.setProperty(StandaloneCoordinator.STORAGE_CONFIG,
|
||||
FileConfigStorage.class.getName());
|
||||
|
@ -50,7 +47,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
|
|||
coordinatorProps.setProperty(FileConfigStorage.FILE_CONFIG,
|
||||
coordinatorConfigFile.getAbsolutePath());
|
||||
coordinator = new StandaloneCoordinator(worker, coordinatorProps);
|
||||
createCallback = PowerMock.createMock(Callback.class);
|
||||
|
||||
connectorProps = new Properties();
|
||||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||
|
|
|
@ -42,7 +42,6 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
|||
public void setup() {
|
||||
worker = PowerMock.createMock(Worker.class);
|
||||
coordinator = new StandaloneCoordinator(worker, new Properties());
|
||||
createCallback = PowerMock.createMock(Callback.class);
|
||||
|
||||
connectorProps = new Properties();
|
||||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||
|
@ -85,9 +84,9 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
|||
PowerMock.replayAll();
|
||||
|
||||
coordinator.addConnector(connectorProps, createCallback);
|
||||
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback() {
|
||||
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
|
||||
@Override
|
||||
public void onCompletion(Throwable error, Object result) {
|
||||
public void onCompletion(Throwable error, Void result) {
|
||||
|
||||
}
|
||||
});
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.kafka.copycat.util.Callback;
|
|||
import org.apache.kafka.copycat.util.ConnectorTaskId;
|
||||
import org.easymock.EasyMock;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
import org.powermock.api.easymock.annotation.Mock;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Properties;
|
||||
|
@ -40,9 +41,9 @@ public class StandaloneCoordinatorTestBase {
|
|||
protected static final String TOPICS_LIST_STR = "topic1,topic2";
|
||||
|
||||
protected StandaloneCoordinator coordinator;
|
||||
protected Worker worker;
|
||||
@Mock protected Worker worker;
|
||||
protected Connector connector;
|
||||
protected Callback<String> createCallback;
|
||||
@Mock protected Callback<String> createCallback;
|
||||
|
||||
protected Properties connectorProps;
|
||||
protected Properties taskProps;
|
||||
|
|
|
@ -71,8 +71,7 @@ public class FileOffsetBackingStoreTest {
|
|||
|
||||
store.set("namespace", firstSet, setCallback).get();
|
||||
|
||||
Map<ByteBuffer, ByteBuffer> values
|
||||
= store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
|
||||
Map<ByteBuffer, ByteBuffer> values = store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
|
||||
assertEquals(buffer("value"), values.get(buffer("key")));
|
||||
assertEquals(null, values.get(buffer("bad")));
|
||||
|
||||
|
@ -92,8 +91,7 @@ public class FileOffsetBackingStoreTest {
|
|||
FileOffsetBackingStore restore = new FileOffsetBackingStore();
|
||||
restore.configure(props);
|
||||
restore.start();
|
||||
Map<ByteBuffer, ByteBuffer> values
|
||||
= restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get();
|
||||
Map<ByteBuffer, ByteBuffer> values = restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get();
|
||||
assertEquals(buffer("value"), values.get(buffer("key")));
|
||||
|
||||
PowerMock.verifyAll();
|
||||
|
@ -104,15 +102,16 @@ public class FileOffsetBackingStoreTest {
|
|||
}
|
||||
|
||||
private Callback<Void> expectSuccessfulSetCallback() {
|
||||
@SuppressWarnings("unchecked")
|
||||
Callback<Void> setCallback = PowerMock.createMock(Callback.class);
|
||||
setCallback.onCompletion(null, null);
|
||||
setCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.isNull(Void.class));
|
||||
PowerMock.expectLastCall();
|
||||
return setCallback;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() {
|
||||
Callback<Map<ByteBuffer, ByteBuffer>> getCallback
|
||||
= PowerMock.createMock(Callback.class);
|
||||
Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class);
|
||||
getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class));
|
||||
PowerMock.expectLastCall();
|
||||
return getCallback;
|
||||
|
|
|
@ -26,34 +26,43 @@ import org.easymock.IAnswer;
|
|||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.powermock.api.easymock.PowerMock;
|
||||
import org.powermock.api.easymock.annotation.Mock;
|
||||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
public class OffsetStorageWriterTest {
|
||||
private static final String NAMESPACE = "namespace";
|
||||
private static final String OFFSET_KEY = "key";
|
||||
// Copycat format - any types should be accepted here
|
||||
private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
|
||||
private static final String OFFSET_VALUE = "value";
|
||||
private static final String OFFSET_KEY_CONVERTED = "key-converted";
|
||||
// Native objects - must match serializer types
|
||||
private static final int OFFSET_KEY_CONVERTED = 12;
|
||||
private static final String OFFSET_VALUE_CONVERTED = "value-converted";
|
||||
// Serialized
|
||||
private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
|
||||
private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
|
||||
private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED
|
||||
= Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED),
|
||||
ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
|
||||
|
||||
private OffsetBackingStore store;
|
||||
private Converter keyConverter;
|
||||
private Converter valueConverter;
|
||||
private Serializer keySerializer;
|
||||
private Serializer valueSerializer;
|
||||
private OffsetStorageWriter writer;
|
||||
@Mock private OffsetBackingStore store;
|
||||
@Mock private Converter<Integer> keyConverter;
|
||||
@Mock private Converter<String> valueConverter;
|
||||
@Mock private Serializer<Integer> keySerializer;
|
||||
@Mock private Serializer<String> valueSerializer;
|
||||
private OffsetStorageWriter<Integer, String> writer;
|
||||
|
||||
private static Exception exception = new RuntimeException("error");
|
||||
|
||||
|
@ -61,13 +70,7 @@ public class OffsetStorageWriterTest {
|
|||
|
||||
@Before
|
||||
public void setup() {
|
||||
store = PowerMock.createMock(OffsetBackingStore.class);
|
||||
keyConverter = PowerMock.createMock(Converter.class);
|
||||
valueConverter = PowerMock.createMock(Converter.class);
|
||||
keySerializer = PowerMock.createMock(Serializer.class);
|
||||
valueSerializer = PowerMock.createMock(Serializer.class);
|
||||
writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
|
||||
|
||||
writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
|
||||
service = Executors.newFixedThreadPool(1);
|
||||
}
|
||||
|
||||
|
@ -78,6 +81,7 @@ public class OffsetStorageWriterTest {
|
|||
|
||||
@Test
|
||||
public void testWriteFlush() throws Exception {
|
||||
@SuppressWarnings("unchecked")
|
||||
Callback<Void> callback = PowerMock.createMock(Callback.class);
|
||||
expectStore(callback, false);
|
||||
|
||||
|
@ -109,6 +113,7 @@ public class OffsetStorageWriterTest {
|
|||
// When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored
|
||||
// such that a subsequent flush will write them.
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final Callback<Void> callback = PowerMock.createMock(Callback.class);
|
||||
// First time the write fails
|
||||
expectStore(callback, true);
|
||||
|
@ -130,6 +135,7 @@ public class OffsetStorageWriterTest {
|
|||
|
||||
@Test(expected = CopycatRuntimeException.class)
|
||||
public void testAlreadyFlushing() throws Exception {
|
||||
@SuppressWarnings("unchecked")
|
||||
final Callback<Void> callback = PowerMock.createMock(Callback.class);
|
||||
// Trigger the send, but don't invoke the callback so we'll still be mid-flush
|
||||
CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
|
||||
|
@ -158,6 +164,7 @@ public class OffsetStorageWriterTest {
|
|||
|
||||
@Test
|
||||
public void testCancelAfterAwaitFlush() throws Exception {
|
||||
@SuppressWarnings("unchecked")
|
||||
Callback<Void> callback = PowerMock.createMock(Callback.class);
|
||||
CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
|
||||
// In this test, the write should be cancelled so the callback will not be invoked and is not
|
||||
|
|
Loading…
Reference in New Issue