mirror of https://github.com/apache/kafka.git
Make Converter generic to match serializers since some serialization formats do not require a base class of Object; update many other classes to have generic key and value class type parameters to match this change.
This commit is contained in:
parent
b194c7348f
commit
6787a85e87
|
@ -24,7 +24,7 @@ import java.util.List;
|
||||||
/**
|
/**
|
||||||
* SourceTask is a Task that pulls records from another system for storage in Kafka.
|
* SourceTask is a Task that pulls records from another system for storage in Kafka.
|
||||||
*/
|
*/
|
||||||
public abstract class SourceTask<K, V> implements Task {
|
public abstract class SourceTask implements Task {
|
||||||
|
|
||||||
protected SourceTaskContext context;
|
protected SourceTaskContext context;
|
||||||
|
|
||||||
|
|
|
@ -24,19 +24,19 @@ package org.apache.kafka.copycat.storage;
|
||||||
* component -- the producer or consumer serializer or deserializer for records or a Copycat
|
* component -- the producer or consumer serializer or deserializer for records or a Copycat
|
||||||
* serializer or deserializer for offsets.
|
* serializer or deserializer for offsets.
|
||||||
*/
|
*/
|
||||||
public interface Converter {
|
public interface Converter<T> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a Copycat data object to a native object for serialization.
|
* Convert a Copycat data object to a native object for serialization.
|
||||||
* @param value
|
* @param value
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
Object fromCopycatData(Object value);
|
T fromCopycatData(Object value);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a native object to a Copycat data object.
|
* Convert a native object to a Copycat data object.
|
||||||
* @param value
|
* @param value
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
Object toCopycatData(Object value);
|
Object toCopycatData(T value);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ import java.util.Properties;
|
||||||
/**
|
/**
|
||||||
* FileStreamSourceTask reads from stdin or a file.
|
* FileStreamSourceTask reads from stdin or a file.
|
||||||
*/
|
*/
|
||||||
public class FileStreamSourceTask extends SourceTask<Object, Object> {
|
public class FileStreamSourceTask extends SourceTask {
|
||||||
private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
|
private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
|
||||||
|
|
||||||
private InputStream stream;
|
private InputStream stream;
|
||||||
|
|
|
@ -32,7 +32,7 @@ import java.util.*;
|
||||||
/**
|
/**
|
||||||
* Implementation of Converter that uses JSON to store schemas and objects.
|
* Implementation of Converter that uses JSON to store schemas and objects.
|
||||||
*/
|
*/
|
||||||
public class JsonConverter implements Converter {
|
public class JsonConverter implements Converter<JsonNode> {
|
||||||
|
|
||||||
private static final HashMap<String, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS
|
private static final HashMap<String, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS
|
||||||
= new HashMap<>();
|
= new HashMap<>();
|
||||||
|
@ -106,15 +106,11 @@ public class JsonConverter implements Converter {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Object toCopycatData(Object value) {
|
public Object toCopycatData(JsonNode value) {
|
||||||
if (!(value instanceof JsonNode))
|
if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
|
||||||
throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects.");
|
throw new CopycatRuntimeException("JSON value converted to Copycat must be in envelope containing schema");
|
||||||
|
|
||||||
JsonNode data = (JsonNode) value;
|
return convertToCopycat(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||||
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
|
|
||||||
throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema");
|
|
||||||
|
|
||||||
return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class Copycat {
|
||||||
try {
|
try {
|
||||||
// Destroy any requested connectors
|
// Destroy any requested connectors
|
||||||
for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) {
|
for (final String connName : config.getList(CopycatConfig.DELETE_CONNECTORS_CONFIG)) {
|
||||||
FutureCallback cb = new FutureCallback(new Callback<Void>() {
|
FutureCallback<Void> cb = new FutureCallback<>(new Callback<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void onCompletion(Throwable error, Void result) {
|
public void onCompletion(Throwable error, Void result) {
|
||||||
if (error != null)
|
if (error != null)
|
||||||
|
@ -85,7 +85,7 @@ public class Copycat {
|
||||||
// Create any new connectors
|
// Create any new connectors
|
||||||
for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
|
for (final String connectorPropsFile : config.getList(CopycatConfig.CREATE_CONNECTORS_CONFIG)) {
|
||||||
connectorProps = Utils.loadProps(connectorPropsFile);
|
connectorProps = Utils.loadProps(connectorPropsFile);
|
||||||
FutureCallback cb = new FutureCallback(new Callback<String>() {
|
FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() {
|
||||||
@Override
|
@Override
|
||||||
public void onCompletion(Throwable error, String id) {
|
public void onCompletion(Throwable error, String id) {
|
||||||
if (error != null)
|
if (error != null)
|
||||||
|
|
|
@ -128,27 +128,20 @@ public class WorkerConfig extends AbstractConfig {
|
||||||
Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC);
|
Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Properties originalProperties;
|
|
||||||
|
|
||||||
public WorkerConfig() {
|
public WorkerConfig() {
|
||||||
this(new Properties());
|
this(new Properties());
|
||||||
}
|
}
|
||||||
|
|
||||||
public WorkerConfig(Properties props) {
|
public WorkerConfig(Properties props) {
|
||||||
super(config, props);
|
super(config, props);
|
||||||
this.originalProperties = props;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Properties getUnusedProperties() {
|
public Properties getUnusedProperties() {
|
||||||
Set<String> unusedKeys = this.unused();
|
Set<String> unusedKeys = this.unused();
|
||||||
Properties unusedProps = new Properties();
|
Properties unusedProps = new Properties();
|
||||||
for (String key : unusedKeys) {
|
for (String key : unusedKeys) {
|
||||||
unusedProps.setProperty(key, originalProperties.getProperty(key));
|
unusedProps.put(key, this.originals().get(key));
|
||||||
}
|
}
|
||||||
return unusedProps;
|
return unusedProps;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Properties getOriginalProperties() {
|
|
||||||
return originalProperties;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,20 +48,20 @@ import java.util.Properties;
|
||||||
* Since each task has a dedicated thread, this is mainly just a container for them.
|
* Since each task has a dedicated thread, this is mainly just a container for them.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
public class Worker {
|
public class Worker<K, V> {
|
||||||
private static final Logger log = LoggerFactory.getLogger(Worker.class);
|
private static final Logger log = LoggerFactory.getLogger(Worker.class);
|
||||||
|
|
||||||
private Time time;
|
private Time time;
|
||||||
private WorkerConfig config;
|
private WorkerConfig config;
|
||||||
private Converter keyConverter;
|
private Converter<K> keyConverter;
|
||||||
private Converter valueConverter;
|
private Converter<V> valueConverter;
|
||||||
private OffsetBackingStore offsetBackingStore;
|
private OffsetBackingStore offsetBackingStore;
|
||||||
private Serializer offsetKeySerializer;
|
private Serializer<K> offsetKeySerializer;
|
||||||
private Serializer offsetValueSerializer;
|
private Serializer<V> offsetValueSerializer;
|
||||||
private Deserializer offsetKeyDeserializer;
|
private Deserializer<K> offsetKeyDeserializer;
|
||||||
private Deserializer offsetValueDeserializer;
|
private Deserializer<V> offsetValueDeserializer;
|
||||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
|
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
|
||||||
private KafkaProducer producer;
|
private KafkaProducer<K, V> producer;
|
||||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||||
|
|
||||||
public Worker(WorkerConfig config) {
|
public Worker(WorkerConfig config) {
|
||||||
|
@ -70,6 +70,7 @@ public class Worker {
|
||||||
null, null, null, null);
|
null, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
|
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
|
||||||
Serializer offsetKeySerializer, Serializer offsetValueSerializer,
|
Serializer offsetKeySerializer, Serializer offsetValueSerializer,
|
||||||
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
|
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
|
||||||
|
@ -83,28 +84,28 @@ public class Worker {
|
||||||
this.offsetKeySerializer = offsetKeySerializer;
|
this.offsetKeySerializer = offsetKeySerializer;
|
||||||
} else {
|
} else {
|
||||||
this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
|
this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
|
||||||
this.offsetKeySerializer.configure(config.getOriginalProperties(), true);
|
this.offsetKeySerializer.configure(config.originals(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (offsetValueSerializer != null) {
|
if (offsetValueSerializer != null) {
|
||||||
this.offsetValueSerializer = offsetValueSerializer;
|
this.offsetValueSerializer = offsetValueSerializer;
|
||||||
} else {
|
} else {
|
||||||
this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
|
this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
|
||||||
this.offsetValueSerializer.configure(config.getOriginalProperties(), false);
|
this.offsetValueSerializer.configure(config.originals(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (offsetKeyDeserializer != null) {
|
if (offsetKeyDeserializer != null) {
|
||||||
this.offsetKeyDeserializer = offsetKeyDeserializer;
|
this.offsetKeyDeserializer = offsetKeyDeserializer;
|
||||||
} else {
|
} else {
|
||||||
this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
||||||
this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true);
|
this.offsetKeyDeserializer.configure(config.originals(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (offsetValueDeserializer != null) {
|
if (offsetValueDeserializer != null) {
|
||||||
this.offsetValueDeserializer = offsetValueDeserializer;
|
this.offsetValueDeserializer = offsetValueDeserializer;
|
||||||
} else {
|
} else {
|
||||||
this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
||||||
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
|
this.offsetValueDeserializer.configure(config.originals(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,14 +186,14 @@ public class Worker {
|
||||||
final WorkerTask workerTask;
|
final WorkerTask workerTask;
|
||||||
if (task instanceof SourceTask) {
|
if (task instanceof SourceTask) {
|
||||||
SourceTask sourceTask = (SourceTask) task;
|
SourceTask sourceTask = (SourceTask) task;
|
||||||
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.getConnector(),
|
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.getConnector(),
|
||||||
keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
|
keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
|
||||||
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.getConnector(),
|
OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.getConnector(),
|
||||||
keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
|
keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
|
||||||
workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
|
workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer,
|
||||||
offsetReader, offsetWriter, config, time);
|
offsetReader, offsetWriter, config, time);
|
||||||
} else if (task instanceof SinkTask) {
|
} else if (task instanceof SinkTask) {
|
||||||
workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
|
workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, keyConverter, valueConverter, time);
|
||||||
} else {
|
} else {
|
||||||
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
|
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
|
||||||
throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");
|
throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");
|
||||||
|
|
|
@ -38,21 +38,21 @@ import java.util.concurrent.TimeUnit;
|
||||||
/**
|
/**
|
||||||
* WorkerTask that uses a SinkTask to export data from Kafka.
|
* WorkerTask that uses a SinkTask to export data from Kafka.
|
||||||
*/
|
*/
|
||||||
public class WorkerSinkTask implements WorkerTask {
|
public class WorkerSinkTask<K, V> implements WorkerTask {
|
||||||
private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
|
private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
|
||||||
|
|
||||||
private final ConnectorTaskId id;
|
private final ConnectorTaskId id;
|
||||||
private final SinkTask task;
|
private final SinkTask task;
|
||||||
private final WorkerConfig workerConfig;
|
private final WorkerConfig workerConfig;
|
||||||
private final Time time;
|
private final Time time;
|
||||||
private final Converter keyConverter;
|
private final Converter<K> keyConverter;
|
||||||
private final Converter valueConverter;
|
private final Converter<V> valueConverter;
|
||||||
private WorkerSinkTaskThread workThread;
|
private WorkerSinkTaskThread workThread;
|
||||||
private KafkaConsumer<Object, Object> consumer;
|
private KafkaConsumer<K, V> consumer;
|
||||||
private final SinkTaskContext context;
|
private final SinkTaskContext context;
|
||||||
|
|
||||||
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
|
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
|
||||||
Converter keyConverter, Converter valueConverter, Time time) {
|
Converter<K> keyConverter, Converter<V> valueConverter, Time time) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.task = task;
|
this.task = task;
|
||||||
this.workerConfig = workerConfig;
|
this.workerConfig = workerConfig;
|
||||||
|
@ -107,7 +107,7 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
public void poll(long timeoutMs) {
|
public void poll(long timeoutMs) {
|
||||||
try {
|
try {
|
||||||
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
|
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
|
||||||
ConsumerRecords<Object, Object> msgs = consumer.poll(timeoutMs);
|
ConsumerRecords<K, V> msgs = consumer.poll(timeoutMs);
|
||||||
log.trace("{} polling returned {} messages", id, msgs.count());
|
log.trace("{} polling returned {} messages", id, msgs.count());
|
||||||
deliverMessages(msgs);
|
deliverMessages(msgs);
|
||||||
} catch (ConsumerWakeupException we) {
|
} catch (ConsumerWakeupException we) {
|
||||||
|
@ -153,7 +153,7 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
return workerConfig;
|
return workerConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
private KafkaConsumer<Object, Object> createConsumer(Properties taskProps) {
|
private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
|
||||||
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
|
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
|
||||||
if (topicsStr == null || topicsStr.isEmpty())
|
if (topicsStr == null || topicsStr.isEmpty())
|
||||||
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
|
throw new CopycatRuntimeException("Sink tasks require a list of topics.");
|
||||||
|
@ -172,7 +172,7 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
|
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
|
||||||
workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName());
|
workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName());
|
||||||
|
|
||||||
KafkaConsumer<Object, Object> newConsumer;
|
KafkaConsumer<K, V> newConsumer;
|
||||||
try {
|
try {
|
||||||
newConsumer = new KafkaConsumer<>(props);
|
newConsumer = new KafkaConsumer<>(props);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
@ -201,11 +201,11 @@ public class WorkerSinkTask implements WorkerTask {
|
||||||
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
|
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void deliverMessages(ConsumerRecords<Object, Object> msgs) {
|
private void deliverMessages(ConsumerRecords<K, V> msgs) {
|
||||||
// Finally, deliver this batch to the sink
|
// Finally, deliver this batch to the sink
|
||||||
if (msgs.count() > 0) {
|
if (msgs.count() > 0) {
|
||||||
List<SinkRecord> records = new ArrayList<>();
|
List<SinkRecord> records = new ArrayList<>();
|
||||||
for (ConsumerRecord<Object, Object> msg : msgs) {
|
for (ConsumerRecord<K, V> msg : msgs) {
|
||||||
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
|
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
|
||||||
records.add(
|
records.add(
|
||||||
new SinkRecord(msg.topic(), msg.partition(),
|
new SinkRecord(msg.topic(), msg.partition(),
|
||||||
|
|
|
@ -46,33 +46,33 @@ import java.util.concurrent.TimeoutException;
|
||||||
/**
|
/**
|
||||||
* WorkerTask that uses a SourceTask to ingest data into Kafka.
|
* WorkerTask that uses a SourceTask to ingest data into Kafka.
|
||||||
*/
|
*/
|
||||||
public class WorkerSourceTask implements WorkerTask {
|
public class WorkerSourceTask<K, V> implements WorkerTask {
|
||||||
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
|
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
|
||||||
|
|
||||||
private ConnectorTaskId id;
|
private ConnectorTaskId id;
|
||||||
private SourceTask task;
|
private SourceTask task;
|
||||||
private final Converter keyConverter;
|
private final Converter<K> keyConverter;
|
||||||
private final Converter valueConverter;
|
private final Converter<V> valueConverter;
|
||||||
private KafkaProducer<Object, Object> producer;
|
private KafkaProducer<K, V> producer;
|
||||||
private WorkerSourceTaskThread workThread;
|
private WorkerSourceTaskThread workThread;
|
||||||
private OffsetStorageReader offsetReader;
|
private OffsetStorageReader offsetReader;
|
||||||
private OffsetStorageWriter offsetWriter;
|
private OffsetStorageWriter<K, V> offsetWriter;
|
||||||
private final WorkerConfig workerConfig;
|
private final WorkerConfig workerConfig;
|
||||||
private final Time time;
|
private final Time time;
|
||||||
|
|
||||||
// Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
|
// Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
|
||||||
// there is no IdentityHashSet.
|
// there is no IdentityHashSet.
|
||||||
private IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>
|
private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
|
||||||
outstandingMessages;
|
outstandingMessages;
|
||||||
// A second buffer is used while an offset flush is running
|
// A second buffer is used while an offset flush is running
|
||||||
private IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>>
|
private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
|
||||||
outstandingMessagesBacklog;
|
outstandingMessagesBacklog;
|
||||||
private boolean flushing;
|
private boolean flushing;
|
||||||
|
|
||||||
public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
|
public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
|
||||||
Converter keyConverter, Converter valueConverter,
|
Converter<K> keyConverter, Converter<V> valueConverter,
|
||||||
KafkaProducer<Object, Object> producer,
|
KafkaProducer<K, V> producer,
|
||||||
OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter,
|
OffsetStorageReader offsetReader, OffsetStorageWriter<K, V> offsetWriter,
|
||||||
WorkerConfig workerConfig, Time time) {
|
WorkerConfig workerConfig, Time time) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.task = task;
|
this.task = task;
|
||||||
|
@ -132,7 +132,7 @@ public class WorkerSourceTask implements WorkerTask {
|
||||||
*/
|
*/
|
||||||
private synchronized void sendRecords(List<SourceRecord> records) {
|
private synchronized void sendRecords(List<SourceRecord> records) {
|
||||||
for (SourceRecord record : records) {
|
for (SourceRecord record : records) {
|
||||||
final ProducerRecord<Object, Object> producerRecord
|
final ProducerRecord<K, V> producerRecord
|
||||||
= new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
|
= new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
|
||||||
keyConverter.fromCopycatData(record.getKey()),
|
keyConverter.fromCopycatData(record.getKey()),
|
||||||
valueConverter.fromCopycatData(record.getValue()));
|
valueConverter.fromCopycatData(record.getValue()));
|
||||||
|
@ -162,8 +162,8 @@ public class WorkerSourceTask implements WorkerTask {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void recordSent(final ProducerRecord<Object, Object> record) {
|
private synchronized void recordSent(final ProducerRecord<K, V> record) {
|
||||||
ProducerRecord<Object, Object> removed = outstandingMessages.remove(record);
|
ProducerRecord<K, V> removed = outstandingMessages.remove(record);
|
||||||
// While flushing, we may also see callbacks for items in the backlog
|
// While flushing, we may also see callbacks for items in the backlog
|
||||||
if (removed == null && flushing)
|
if (removed == null && flushing)
|
||||||
removed = outstandingMessagesBacklog.remove(record);
|
removed = outstandingMessagesBacklog.remove(record);
|
||||||
|
@ -275,8 +275,7 @@ public class WorkerSourceTask implements WorkerTask {
|
||||||
|
|
||||||
private void finishSuccessfulFlush() {
|
private void finishSuccessfulFlush() {
|
||||||
// If we were successful, we can just swap instead of replacing items back into the original map
|
// If we were successful, we can just swap instead of replacing items back into the original map
|
||||||
IdentityHashMap<ProducerRecord<Object, Object>, ProducerRecord<Object, Object>> temp
|
IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> temp = outstandingMessages;
|
||||||
= outstandingMessages;
|
|
||||||
outstandingMessages = outstandingMessagesBacklog;
|
outstandingMessages = outstandingMessagesBacklog;
|
||||||
outstandingMessagesBacklog = temp;
|
outstandingMessagesBacklog = temp;
|
||||||
flushing = false;
|
flushing = false;
|
||||||
|
|
|
@ -94,10 +94,14 @@ public class FileConfigStorage implements ConfigStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
private void load() {
|
private void load() {
|
||||||
try {
|
try {
|
||||||
ObjectInputStream is = new ObjectInputStream(new FileInputStream(filename));
|
ObjectInputStream is = new ObjectInputStream(new FileInputStream(filename));
|
||||||
connectorConfig = (Map<String, Properties>) is.readObject();
|
Object data = is.readObject();
|
||||||
|
if (!(data instanceof Map))
|
||||||
|
throw new CopycatRuntimeException("Expected Map but found " + data.getClass());
|
||||||
|
connectorConfig = (Map<String, Properties>) data;
|
||||||
} catch (FileNotFoundException e) {
|
} catch (FileNotFoundException e) {
|
||||||
// Expected on first run
|
// Expected on first run
|
||||||
} catch (IOException | ClassNotFoundException e) {
|
} catch (IOException | ClassNotFoundException e) {
|
||||||
|
|
|
@ -62,11 +62,14 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
|
||||||
log.info("Stopped FileOffsetBackingStore");
|
log.info("Stopped FileOffsetBackingStore");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
private void load() {
|
private void load() {
|
||||||
try {
|
try {
|
||||||
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
|
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
|
||||||
HashMap<String, Map<byte[], byte[]>> raw
|
Object obj = is.readObject();
|
||||||
= (HashMap<String, Map<byte[], byte[]>>) is.readObject();
|
if (!(obj instanceof HashMap))
|
||||||
|
throw new CopycatRuntimeException("Expected HashMap but found " + obj.getClass());
|
||||||
|
HashMap<String, Map<byte[], byte[]>> raw = (HashMap<String, Map<byte[], byte[]>>) obj;
|
||||||
data = new HashMap<>();
|
data = new HashMap<>();
|
||||||
for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
|
for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
|
||||||
HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();
|
HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();
|
||||||
|
|
|
@ -34,20 +34,19 @@ import java.util.Map;
|
||||||
* directly, the interface is only separate from this implementation because it needs to be
|
* directly, the interface is only separate from this implementation because it needs to be
|
||||||
* included in the public API package.
|
* included in the public API package.
|
||||||
*/
|
*/
|
||||||
public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
|
||||||
private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
|
private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
|
||||||
|
|
||||||
private final OffsetBackingStore backingStore;
|
private final OffsetBackingStore backingStore;
|
||||||
private final String namespace;
|
private final String namespace;
|
||||||
private final Converter keyConverter;
|
private final Converter<K> keyConverter;
|
||||||
private final Converter valueConverter;
|
private final Converter<V> valueConverter;
|
||||||
private final Serializer keySerializer;
|
private final Serializer<K> keySerializer;
|
||||||
private final Deserializer valueDeserializer;
|
private final Deserializer<V> valueDeserializer;
|
||||||
|
|
||||||
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
|
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
|
||||||
Converter keyConverter, Converter valueConverter,
|
Converter<K> keyConverter, Converter<V> valueConverter,
|
||||||
Serializer keySerializer,
|
Serializer<K> keySerializer, Deserializer<V> valueDeserializer) {
|
||||||
Deserializer valueDeserializer) {
|
|
||||||
this.backingStore = backingStore;
|
this.backingStore = backingStore;
|
||||||
this.namespace = namespace;
|
this.namespace = namespace;
|
||||||
this.keyConverter = keyConverter;
|
this.keyConverter = keyConverter;
|
||||||
|
|
|
@ -63,15 +63,16 @@ import java.util.concurrent.Future;
|
||||||
* This class is not thread-safe. It should only be accessed from a Task's processing thread.
|
* This class is not thread-safe. It should only be accessed from a Task's processing thread.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
public class OffsetStorageWriter {
|
public class OffsetStorageWriter<K, V> {
|
||||||
private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
|
private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
|
||||||
|
|
||||||
private final OffsetBackingStore backingStore;
|
private final OffsetBackingStore backingStore;
|
||||||
private final Converter keyConverter;
|
private final Converter<K> keyConverter;
|
||||||
private final Converter valueConverter;
|
private final Converter<V> valueConverter;
|
||||||
private final Serializer keySerializer;
|
private final Serializer<K> keySerializer;
|
||||||
private final Serializer valueSerializer;
|
private final Serializer<V> valueSerializer;
|
||||||
private final String namespace;
|
private final String namespace;
|
||||||
|
// Offset data in Copycat format
|
||||||
private Map<Object, Object> data = new HashMap<>();
|
private Map<Object, Object> data = new HashMap<>();
|
||||||
|
|
||||||
// Not synchronized, should only be accessed by flush thread
|
// Not synchronized, should only be accessed by flush thread
|
||||||
|
@ -80,8 +81,8 @@ public class OffsetStorageWriter {
|
||||||
private long currentFlushId = 0;
|
private long currentFlushId = 0;
|
||||||
|
|
||||||
public OffsetStorageWriter(OffsetBackingStore backingStore,
|
public OffsetStorageWriter(OffsetBackingStore backingStore,
|
||||||
String namespace, Converter keyConverter, Converter valueConverter,
|
String namespace, Converter<K> keyConverter, Converter<V> valueConverter,
|
||||||
Serializer keySerializer, Serializer valueSerializer) {
|
Serializer<K> keySerializer, Serializer<V> valueSerializer) {
|
||||||
this.backingStore = backingStore;
|
this.backingStore = backingStore;
|
||||||
this.namespace = namespace;
|
this.namespace = namespace;
|
||||||
this.keyConverter = keyConverter;
|
this.keyConverter = keyConverter;
|
||||||
|
@ -90,6 +91,11 @@ public class OffsetStorageWriter {
|
||||||
this.valueSerializer = valueSerializer;
|
this.valueSerializer = valueSerializer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set an offset for a partition using Copycat data values
|
||||||
|
* @param partition the partition to store an offset for
|
||||||
|
* @param offset the offset
|
||||||
|
*/
|
||||||
public synchronized void setOffset(Object partition, Object offset) {
|
public synchronized void setOffset(Object partition, Object offset) {
|
||||||
data.put(partition, offset);
|
data.put(partition, offset);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,12 +21,12 @@ import java.util.concurrent.*;
|
||||||
|
|
||||||
public class FutureCallback<T> implements Callback<T>, Future<T> {
|
public class FutureCallback<T> implements Callback<T>, Future<T> {
|
||||||
|
|
||||||
private Callback underlying;
|
private Callback<T> underlying;
|
||||||
private CountDownLatch finishedLatch;
|
private CountDownLatch finishedLatch;
|
||||||
private T result = null;
|
private T result = null;
|
||||||
private Throwable exception = null;
|
private Throwable exception = null;
|
||||||
|
|
||||||
public FutureCallback(Callback underlying) {
|
public FutureCallback(Callback<T> underlying) {
|
||||||
this.underlying = underlying;
|
this.underlying = underlying;
|
||||||
this.finishedLatch = new CountDownLatch(1);
|
this.finishedLatch = new CountDownLatch(1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.easymock.*;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.powermock.api.easymock.PowerMock;
|
import org.powermock.api.easymock.PowerMock;
|
||||||
|
import org.powermock.api.easymock.annotation.Mock;
|
||||||
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
import org.powermock.core.classloader.annotations.PowerMockIgnore;
|
||||||
import org.powermock.core.classloader.annotations.PrepareForTest;
|
import org.powermock.core.classloader.annotations.PrepareForTest;
|
||||||
import org.powermock.modules.junit4.PowerMockRunner;
|
import org.powermock.modules.junit4.PowerMockRunner;
|
||||||
|
@ -46,33 +47,36 @@ import static org.junit.Assert.assertEquals;
|
||||||
@PowerMockIgnore("javax.management.*")
|
@PowerMockIgnore("javax.management.*")
|
||||||
public class WorkerSinkTaskTest extends ThreadedTest {
|
public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
|
|
||||||
// These are fixed to keep this code simpler
|
// These are fixed to keep this code simpler. In this example we assume byte[] raw values
|
||||||
|
// with mix of integer/string in Copycat
|
||||||
private static final String TOPIC = "test";
|
private static final String TOPIC = "test";
|
||||||
private static final int PARTITION = 12;
|
private static final int PARTITION = 12;
|
||||||
private static final long FIRST_OFFSET = 45;
|
private static final long FIRST_OFFSET = 45;
|
||||||
private static final String KEY = "KEY";
|
private static final int KEY = 12;
|
||||||
private static final String VALUE = "VALUE";
|
private static final String VALUE = "VALUE";
|
||||||
private static final String TOPIC_PARTITION_STR = "test-12";
|
private static final byte[] RAW_KEY = "key".getBytes();
|
||||||
|
private static final byte[] RAW_VALUE = "value".getBytes();
|
||||||
|
|
||||||
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
|
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
|
||||||
|
|
||||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||||
private Time time;
|
private Time time;
|
||||||
private SinkTask sinkTask;
|
@Mock private SinkTask sinkTask;
|
||||||
private WorkerConfig workerConfig;
|
private WorkerConfig workerConfig;
|
||||||
private Converter keyConverter;
|
@Mock private Converter<byte[]> keyConverter;
|
||||||
private Converter valueConverter;
|
@Mock
|
||||||
private WorkerSinkTask workerTask;
|
private Converter<byte[]> valueConverter;
|
||||||
private KafkaConsumer<Object, Object> consumer;
|
private WorkerSinkTask<Integer, String> workerTask;
|
||||||
|
@Mock private KafkaConsumer<byte[], byte[]> consumer;
|
||||||
private WorkerSinkTaskThread workerThread;
|
private WorkerSinkTaskThread workerThread;
|
||||||
|
|
||||||
private long recordsReturned;
|
private long recordsReturned;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public void setup() {
|
public void setup() {
|
||||||
super.setup();
|
super.setup();
|
||||||
time = new MockTime();
|
time = new MockTime();
|
||||||
sinkTask = PowerMock.createMock(SinkTask.class);
|
|
||||||
Properties workerProps = new Properties();
|
Properties workerProps = new Properties();
|
||||||
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||||
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||||
|
@ -81,8 +85,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||||
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||||
workerConfig = new WorkerConfig(workerProps);
|
workerConfig = new WorkerConfig(workerProps);
|
||||||
keyConverter = PowerMock.createMock(Converter.class);
|
|
||||||
valueConverter = PowerMock.createMock(Converter.class);
|
|
||||||
workerTask = PowerMock.createPartialMock(
|
workerTask = PowerMock.createPartialMock(
|
||||||
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
|
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
|
||||||
taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
|
taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
|
||||||
|
@ -129,19 +131,15 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
public void testDeliverConvertsData() throws Exception {
|
public void testDeliverConvertsData() throws Exception {
|
||||||
// Validate conversion is performed when data is delivered
|
// Validate conversion is performed when data is delivered
|
||||||
Integer record = 12;
|
Integer record = 12;
|
||||||
byte[] rawKey = "key".getBytes(), rawValue = "value".getBytes();
|
|
||||||
|
|
||||||
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
|
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
|
||||||
Collections.singletonMap(
|
Collections.singletonMap(
|
||||||
new TopicPartition("topic", 0),
|
new TopicPartition("topic", 0),
|
||||||
Collections.singletonList(
|
Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE))));
|
||||||
new ConsumerRecord<Object, Object>("topic", 0, 0, rawKey, rawValue))));
|
|
||||||
|
|
||||||
// Exact data doesn't matter, but should be passed directly to sink task
|
// Exact data doesn't matter, but should be passed directly to sink task
|
||||||
EasyMock.expect(keyConverter.toCopycatData(rawKey))
|
EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record);
|
||||||
.andReturn(record);
|
EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record);
|
||||||
EasyMock.expect(valueConverter.toCopycatData(rawValue))
|
|
||||||
.andReturn(record);
|
|
||||||
Capture<Collection<SinkRecord>> capturedRecords
|
Capture<Collection<SinkRecord>> capturedRecords
|
||||||
= EasyMock.newCapture(CaptureType.ALL);
|
= EasyMock.newCapture(CaptureType.ALL);
|
||||||
sinkTask.put(EasyMock.capture(capturedRecords));
|
sinkTask.put(EasyMock.capture(capturedRecords));
|
||||||
|
@ -262,14 +260,13 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
PowerMock.verifyAll();
|
PowerMock.verifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
private KafkaConsumer<Object, Object> expectInitializeTask(Properties taskProps)
|
private KafkaConsumer<byte[], byte[]> expectInitializeTask(Properties taskProps)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
|
sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
sinkTask.start(taskProps);
|
sinkTask.start(taskProps);
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
|
|
||||||
consumer = PowerMock.createMock(KafkaConsumer.class);
|
|
||||||
PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
|
PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
|
||||||
.andReturn(consumer);
|
.andReturn(consumer);
|
||||||
workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"},
|
workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"},
|
||||||
|
@ -302,22 +299,23 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
||||||
// Stub out all the consumer stream/iterator responses, which we just want to verify occur,
|
// Stub out all the consumer stream/iterator responses, which we just want to verify occur,
|
||||||
// but don't care about the exact details here.
|
// but don't care about the exact details here.
|
||||||
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
|
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
|
||||||
new IAnswer<ConsumerRecords<Object, Object>>() {
|
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
|
||||||
@Override
|
@Override
|
||||||
public ConsumerRecords<Object, Object> answer() throws Throwable {
|
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
|
||||||
// "Sleep" so time will progress
|
// "Sleep" so time will progress
|
||||||
time.sleep(pollDelayMs);
|
time.sleep(pollDelayMs);
|
||||||
ConsumerRecords<Object, Object> records = new ConsumerRecords<>(
|
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
|
||||||
Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), Arrays.asList(
|
Collections.singletonMap(
|
||||||
new ConsumerRecord<Object, Object>(TOPIC, PARTITION,
|
new TopicPartition(TOPIC, PARTITION),
|
||||||
FIRST_OFFSET + recordsReturned, KEY,
|
Arrays.asList(
|
||||||
VALUE))));
|
new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
|
||||||
|
)));
|
||||||
recordsReturned++;
|
recordsReturned++;
|
||||||
return records;
|
return records;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
EasyMock.expect(keyConverter.toCopycatData(KEY)).andReturn(KEY).anyTimes();
|
EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(KEY).anyTimes();
|
||||||
EasyMock.expect(valueConverter.toCopycatData(VALUE)).andReturn(VALUE).anyTimes();
|
EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(VALUE).anyTimes();
|
||||||
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
|
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
|
||||||
sinkTask.put(EasyMock.capture(capturedRecords));
|
sinkTask.put(EasyMock.capture(capturedRecords));
|
||||||
EasyMock.expectLastCall().anyTimes();
|
EasyMock.expectLastCall().anyTimes();
|
||||||
|
|
|
@ -37,9 +37,13 @@ import org.easymock.EasyMock;
|
||||||
import org.easymock.IAnswer;
|
import org.easymock.IAnswer;
|
||||||
import org.easymock.IExpectationSetters;
|
import org.easymock.IExpectationSetters;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
import org.powermock.api.easymock.PowerMock;
|
import org.powermock.api.easymock.PowerMock;
|
||||||
|
import org.powermock.api.easymock.annotation.Mock;
|
||||||
|
import org.powermock.modules.junit4.PowerMockRunner;
|
||||||
import org.powermock.reflect.Whitebox;
|
import org.powermock.reflect.Whitebox;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -51,27 +55,29 @@ import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
@RunWith(PowerMockRunner.class)
|
||||||
public class WorkerSourceTaskTest extends ThreadedTest {
|
public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
private static final byte[] PARTITION_BYTES = "partition".getBytes();
|
private static final byte[] PARTITION_BYTES = "partition".getBytes();
|
||||||
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
|
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
|
||||||
|
|
||||||
|
// Copycat-format data
|
||||||
private static final Integer KEY = -1;
|
private static final Integer KEY = -1;
|
||||||
private static final Integer RECORD = 12;
|
private static final Long RECORD = 12L;
|
||||||
// The actual format of this data doesn't matter -- we just want to see that the right version
|
// Native-formatted data. The actual format of this data doesn't matter -- we just want to see that the right version
|
||||||
// is used in the right place.
|
// is used in the right place.
|
||||||
private static final String CONVERTED_KEY = "converted-key";
|
private static final ByteBuffer CONVERTED_KEY = ByteBuffer.wrap("converted-key".getBytes());
|
||||||
private static final String CONVERTED_RECORD = "converted-record";
|
private static final String CONVERTED_RECORD = "converted-record";
|
||||||
|
|
||||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||||
private WorkerConfig config;
|
private WorkerConfig config;
|
||||||
private SourceTask sourceTask;
|
@Mock private SourceTask sourceTask;
|
||||||
private Converter keyConverter;
|
@Mock private Converter<ByteBuffer> keyConverter;
|
||||||
private Converter valueConverter;
|
@Mock private Converter<String> valueConverter;
|
||||||
private KafkaProducer<Object, Object> producer;
|
@Mock private KafkaProducer<ByteBuffer, String> producer;
|
||||||
private OffsetStorageReader offsetReader;
|
@Mock private OffsetStorageReader offsetReader;
|
||||||
private OffsetStorageWriter offsetWriter;
|
@Mock private OffsetStorageWriter<ByteBuffer, String> offsetWriter;
|
||||||
private WorkerSourceTask workerTask;
|
private WorkerSourceTask<ByteBuffer, String> workerTask;
|
||||||
private Future<RecordMetadata> sendFuture;
|
@Mock private Future<RecordMetadata> sendFuture;
|
||||||
|
|
||||||
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
|
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
|
||||||
|
|
||||||
|
@ -91,18 +97,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||||
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||||
config = new WorkerConfig(workerProps);
|
config = new WorkerConfig(workerProps);
|
||||||
sourceTask = PowerMock.createMock(SourceTask.class);
|
|
||||||
keyConverter = PowerMock.createMock(Converter.class);
|
|
||||||
valueConverter = PowerMock.createMock(Converter.class);
|
|
||||||
producer = PowerMock.createMock(KafkaProducer.class);
|
|
||||||
offsetReader = PowerMock.createMock(OffsetStorageReader.class);
|
|
||||||
offsetWriter = PowerMock.createMock(OffsetStorageWriter.class);
|
|
||||||
sendFuture = PowerMock.createMock(Future.class);
|
|
||||||
producerCallbacks = EasyMock.newCapture();
|
producerCallbacks = EasyMock.newCapture();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createWorkerTask() {
|
private void createWorkerTask() {
|
||||||
workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
|
workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, valueConverter, producer,
|
||||||
offsetReader, offsetWriter, config, new SystemTime());
|
offsetReader, offsetWriter, config, new SystemTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -198,7 +197,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
// Can just use the same record for key and value
|
// Can just use the same record for key and value
|
||||||
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD));
|
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD));
|
||||||
|
|
||||||
Capture<ProducerRecord> sent = expectSendRecord();
|
Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord();
|
||||||
|
|
||||||
PowerMock.replayAll();
|
PowerMock.replayAll();
|
||||||
|
|
||||||
|
@ -228,11 +227,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
return latch;
|
return latch;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Capture<ProducerRecord> expectSendRecord() throws InterruptedException {
|
private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() throws InterruptedException {
|
||||||
EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
|
EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
|
||||||
EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
|
EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
|
||||||
|
|
||||||
Capture<ProducerRecord> sent = EasyMock.newCapture();
|
Capture<ProducerRecord<ByteBuffer, String>> sent = EasyMock.newCapture();
|
||||||
// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work
|
// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work
|
||||||
EasyMock.expect(
|
EasyMock.expect(
|
||||||
producer.send(EasyMock.capture(sent),
|
producer.send(EasyMock.capture(sent),
|
||||||
|
@ -260,11 +259,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
||||||
latch.await(1000, TimeUnit.MILLISECONDS);
|
latch.await(1000, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
private void expectOffsetFlush(boolean succeed) throws Exception {
|
private void expectOffsetFlush(boolean succeed) throws Exception {
|
||||||
EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
|
EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
|
||||||
Future<Void> flushFuture = PowerMock.createMock(Future.class);
|
Future<Void> flushFuture = PowerMock.createMock(Future.class);
|
||||||
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class)))
|
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
|
||||||
.andReturn(flushFuture);
|
|
||||||
// Should throw for failure
|
// Should throw for failure
|
||||||
IExpectationSetters<Void> futureGetExpect = EasyMock.expect(
|
IExpectationSetters<Void> futureGetExpect = EasyMock.expect(
|
||||||
flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)));
|
flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)));
|
||||||
|
|
|
@ -82,8 +82,7 @@ public class WorkerTest extends ThreadedTest {
|
||||||
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
|
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
|
||||||
|
|
||||||
PowerMock.mockStatic(Worker.class);
|
PowerMock.mockStatic(Worker.class);
|
||||||
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName())
|
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
|
||||||
.andReturn(task);
|
|
||||||
|
|
||||||
PowerMock.expectNew(
|
PowerMock.expectNew(
|
||||||
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
|
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
|
||||||
|
@ -128,8 +127,7 @@ public class WorkerTest extends ThreadedTest {
|
||||||
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
|
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
|
||||||
|
|
||||||
PowerMock.mockStatic(Worker.class);
|
PowerMock.mockStatic(Worker.class);
|
||||||
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName())
|
PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
|
||||||
.andReturn(task);
|
|
||||||
|
|
||||||
PowerMock.expectNew(
|
PowerMock.expectNew(
|
||||||
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
|
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
|
||||||
|
@ -162,7 +160,7 @@ public class WorkerTest extends ThreadedTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static class TestSourceTask extends SourceTask<Object, Object> {
|
private static class TestSourceTask extends SourceTask {
|
||||||
public TestSourceTask() {
|
public TestSourceTask() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,9 +18,7 @@
|
||||||
package org.apache.kafka.copycat.runtime.standalone;
|
package org.apache.kafka.copycat.runtime.standalone;
|
||||||
|
|
||||||
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
import org.apache.kafka.copycat.runtime.ConnectorConfig;
|
||||||
import org.apache.kafka.copycat.runtime.Worker;
|
|
||||||
import org.apache.kafka.copycat.sink.SinkConnector;
|
import org.apache.kafka.copycat.sink.SinkConnector;
|
||||||
import org.apache.kafka.copycat.util.Callback;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -41,7 +39,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
worker = PowerMock.createMock(Worker.class);
|
|
||||||
Properties coordinatorProps = new Properties();
|
Properties coordinatorProps = new Properties();
|
||||||
coordinatorProps.setProperty(StandaloneCoordinator.STORAGE_CONFIG,
|
coordinatorProps.setProperty(StandaloneCoordinator.STORAGE_CONFIG,
|
||||||
FileConfigStorage.class.getName());
|
FileConfigStorage.class.getName());
|
||||||
|
@ -50,7 +47,6 @@ public class StandaloneCoordinatorRestoreTest extends StandaloneCoordinatorTestB
|
||||||
coordinatorProps.setProperty(FileConfigStorage.FILE_CONFIG,
|
coordinatorProps.setProperty(FileConfigStorage.FILE_CONFIG,
|
||||||
coordinatorConfigFile.getAbsolutePath());
|
coordinatorConfigFile.getAbsolutePath());
|
||||||
coordinator = new StandaloneCoordinator(worker, coordinatorProps);
|
coordinator = new StandaloneCoordinator(worker, coordinatorProps);
|
||||||
createCallback = PowerMock.createMock(Callback.class);
|
|
||||||
|
|
||||||
connectorProps = new Properties();
|
connectorProps = new Properties();
|
||||||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||||
|
|
|
@ -42,7 +42,6 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
||||||
public void setup() {
|
public void setup() {
|
||||||
worker = PowerMock.createMock(Worker.class);
|
worker = PowerMock.createMock(Worker.class);
|
||||||
coordinator = new StandaloneCoordinator(worker, new Properties());
|
coordinator = new StandaloneCoordinator(worker, new Properties());
|
||||||
createCallback = PowerMock.createMock(Callback.class);
|
|
||||||
|
|
||||||
connectorProps = new Properties();
|
connectorProps = new Properties();
|
||||||
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
|
||||||
|
@ -85,9 +84,9 @@ public class StandaloneCoordinatorTest extends StandaloneCoordinatorTestBase {
|
||||||
PowerMock.replayAll();
|
PowerMock.replayAll();
|
||||||
|
|
||||||
coordinator.addConnector(connectorProps, createCallback);
|
coordinator.addConnector(connectorProps, createCallback);
|
||||||
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback() {
|
FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public void onCompletion(Throwable error, Object result) {
|
public void onCompletion(Throwable error, Void result) {
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.kafka.copycat.util.Callback;
|
||||||
import org.apache.kafka.copycat.util.ConnectorTaskId;
|
import org.apache.kafka.copycat.util.ConnectorTaskId;
|
||||||
import org.easymock.EasyMock;
|
import org.easymock.EasyMock;
|
||||||
import org.powermock.api.easymock.PowerMock;
|
import org.powermock.api.easymock.PowerMock;
|
||||||
|
import org.powermock.api.easymock.annotation.Mock;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
@ -40,9 +41,9 @@ public class StandaloneCoordinatorTestBase {
|
||||||
protected static final String TOPICS_LIST_STR = "topic1,topic2";
|
protected static final String TOPICS_LIST_STR = "topic1,topic2";
|
||||||
|
|
||||||
protected StandaloneCoordinator coordinator;
|
protected StandaloneCoordinator coordinator;
|
||||||
protected Worker worker;
|
@Mock protected Worker worker;
|
||||||
protected Connector connector;
|
protected Connector connector;
|
||||||
protected Callback<String> createCallback;
|
@Mock protected Callback<String> createCallback;
|
||||||
|
|
||||||
protected Properties connectorProps;
|
protected Properties connectorProps;
|
||||||
protected Properties taskProps;
|
protected Properties taskProps;
|
||||||
|
|
|
@ -71,8 +71,7 @@ public class FileOffsetBackingStoreTest {
|
||||||
|
|
||||||
store.set("namespace", firstSet, setCallback).get();
|
store.set("namespace", firstSet, setCallback).get();
|
||||||
|
|
||||||
Map<ByteBuffer, ByteBuffer> values
|
Map<ByteBuffer, ByteBuffer> values = store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
|
||||||
= store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
|
|
||||||
assertEquals(buffer("value"), values.get(buffer("key")));
|
assertEquals(buffer("value"), values.get(buffer("key")));
|
||||||
assertEquals(null, values.get(buffer("bad")));
|
assertEquals(null, values.get(buffer("bad")));
|
||||||
|
|
||||||
|
@ -92,8 +91,7 @@ public class FileOffsetBackingStoreTest {
|
||||||
FileOffsetBackingStore restore = new FileOffsetBackingStore();
|
FileOffsetBackingStore restore = new FileOffsetBackingStore();
|
||||||
restore.configure(props);
|
restore.configure(props);
|
||||||
restore.start();
|
restore.start();
|
||||||
Map<ByteBuffer, ByteBuffer> values
|
Map<ByteBuffer, ByteBuffer> values = restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get();
|
||||||
= restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get();
|
|
||||||
assertEquals(buffer("value"), values.get(buffer("key")));
|
assertEquals(buffer("value"), values.get(buffer("key")));
|
||||||
|
|
||||||
PowerMock.verifyAll();
|
PowerMock.verifyAll();
|
||||||
|
@ -104,15 +102,16 @@ public class FileOffsetBackingStoreTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Callback<Void> expectSuccessfulSetCallback() {
|
private Callback<Void> expectSuccessfulSetCallback() {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
Callback<Void> setCallback = PowerMock.createMock(Callback.class);
|
Callback<Void> setCallback = PowerMock.createMock(Callback.class);
|
||||||
setCallback.onCompletion(null, null);
|
setCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.isNull(Void.class));
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
return setCallback;
|
return setCallback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() {
|
private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() {
|
||||||
Callback<Map<ByteBuffer, ByteBuffer>> getCallback
|
Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class);
|
||||||
= PowerMock.createMock(Callback.class);
|
|
||||||
getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class));
|
getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class));
|
||||||
PowerMock.expectLastCall();
|
PowerMock.expectLastCall();
|
||||||
return getCallback;
|
return getCallback;
|
||||||
|
|
|
@ -26,34 +26,43 @@ import org.easymock.IAnswer;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
import org.powermock.api.easymock.PowerMock;
|
import org.powermock.api.easymock.PowerMock;
|
||||||
|
import org.powermock.api.easymock.annotation.Mock;
|
||||||
|
import org.powermock.modules.junit4.PowerMockRunner;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@RunWith(PowerMockRunner.class)
|
||||||
public class OffsetStorageWriterTest {
|
public class OffsetStorageWriterTest {
|
||||||
private static final String NAMESPACE = "namespace";
|
private static final String NAMESPACE = "namespace";
|
||||||
private static final String OFFSET_KEY = "key";
|
// Copycat format - any types should be accepted here
|
||||||
|
private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
|
||||||
private static final String OFFSET_VALUE = "value";
|
private static final String OFFSET_VALUE = "value";
|
||||||
private static final String OFFSET_KEY_CONVERTED = "key-converted";
|
// Native objects - must match serializer types
|
||||||
|
private static final int OFFSET_KEY_CONVERTED = 12;
|
||||||
private static final String OFFSET_VALUE_CONVERTED = "value-converted";
|
private static final String OFFSET_VALUE_CONVERTED = "value-converted";
|
||||||
|
// Serialized
|
||||||
private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
|
private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
|
||||||
private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
|
private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
|
||||||
private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED
|
private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED
|
||||||
= Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED),
|
= Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED),
|
||||||
ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
|
ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
|
||||||
|
|
||||||
private OffsetBackingStore store;
|
@Mock private OffsetBackingStore store;
|
||||||
private Converter keyConverter;
|
@Mock private Converter<Integer> keyConverter;
|
||||||
private Converter valueConverter;
|
@Mock private Converter<String> valueConverter;
|
||||||
private Serializer keySerializer;
|
@Mock private Serializer<Integer> keySerializer;
|
||||||
private Serializer valueSerializer;
|
@Mock private Serializer<String> valueSerializer;
|
||||||
private OffsetStorageWriter writer;
|
private OffsetStorageWriter<Integer, String> writer;
|
||||||
|
|
||||||
private static Exception exception = new RuntimeException("error");
|
private static Exception exception = new RuntimeException("error");
|
||||||
|
|
||||||
|
@ -61,13 +70,7 @@ public class OffsetStorageWriterTest {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
store = PowerMock.createMock(OffsetBackingStore.class);
|
writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
|
||||||
keyConverter = PowerMock.createMock(Converter.class);
|
|
||||||
valueConverter = PowerMock.createMock(Converter.class);
|
|
||||||
keySerializer = PowerMock.createMock(Serializer.class);
|
|
||||||
valueSerializer = PowerMock.createMock(Serializer.class);
|
|
||||||
writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
|
|
||||||
|
|
||||||
service = Executors.newFixedThreadPool(1);
|
service = Executors.newFixedThreadPool(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,6 +81,7 @@ public class OffsetStorageWriterTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteFlush() throws Exception {
|
public void testWriteFlush() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
Callback<Void> callback = PowerMock.createMock(Callback.class);
|
Callback<Void> callback = PowerMock.createMock(Callback.class);
|
||||||
expectStore(callback, false);
|
expectStore(callback, false);
|
||||||
|
|
||||||
|
@ -109,6 +113,7 @@ public class OffsetStorageWriterTest {
|
||||||
// When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored
|
// When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored
|
||||||
// such that a subsequent flush will write them.
|
// such that a subsequent flush will write them.
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
final Callback<Void> callback = PowerMock.createMock(Callback.class);
|
final Callback<Void> callback = PowerMock.createMock(Callback.class);
|
||||||
// First time the write fails
|
// First time the write fails
|
||||||
expectStore(callback, true);
|
expectStore(callback, true);
|
||||||
|
@ -130,6 +135,7 @@ public class OffsetStorageWriterTest {
|
||||||
|
|
||||||
@Test(expected = CopycatRuntimeException.class)
|
@Test(expected = CopycatRuntimeException.class)
|
||||||
public void testAlreadyFlushing() throws Exception {
|
public void testAlreadyFlushing() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
final Callback<Void> callback = PowerMock.createMock(Callback.class);
|
final Callback<Void> callback = PowerMock.createMock(Callback.class);
|
||||||
// Trigger the send, but don't invoke the callback so we'll still be mid-flush
|
// Trigger the send, but don't invoke the callback so we'll still be mid-flush
|
||||||
CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
|
CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
|
||||||
|
@ -158,6 +164,7 @@ public class OffsetStorageWriterTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCancelAfterAwaitFlush() throws Exception {
|
public void testCancelAfterAwaitFlush() throws Exception {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
Callback<Void> callback = PowerMock.createMock(Callback.class);
|
Callback<Void> callback = PowerMock.createMock(Callback.class);
|
||||||
CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
|
CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
|
||||||
// In this test, the write should be cancelled so the callback will not be invoked and is not
|
// In this test, the write should be cancelled so the callback will not be invoked and is not
|
||||||
|
|
Loading…
Reference in New Issue