Split Copycat converter option into two options for key and value.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-01 13:49:30 -07:00
parent 0b5a1a0c57
commit b194c7348f
12 changed files with 91 additions and 65 deletions

View File

@ -15,7 +15,8 @@
bootstrap.servers=localhost:9092 bootstrap.servers=localhost:9092
converter=org.apache.kafka.copycat.json.JsonConverter key.converter=org.apache.kafka.copycat.json.JsonConverter
value.converter=org.apache.kafka.copycat.json.JsonConverter
key.serializer=org.apache.kafka.copycat.json.JsonSerializer key.serializer=org.apache.kafka.copycat.json.JsonSerializer
value.serializer=org.apache.kafka.copycat.json.JsonSerializer value.serializer=org.apache.kafka.copycat.json.JsonSerializer
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer

View File

@ -16,7 +16,8 @@
# These are defaults. This file just demonstrates how to override some settings. # These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092 bootstrap.servers=localhost:9092
converter=org.apache.kafka.copycat.json.JsonConverter key.converter=org.apache.kafka.copycat.json.JsonConverter
value.converter=org.apache.kafka.copycat.json.JsonConverter
key.serializer=org.apache.kafka.copycat.json.JsonSerializer key.serializer=org.apache.kafka.copycat.json.JsonSerializer
value.serializer=org.apache.kafka.copycat.json.JsonSerializer value.serializer=org.apache.kafka.copycat.json.JsonSerializer
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer

View File

@ -49,9 +49,13 @@ public class WorkerConfig extends AbstractConfig {
+ "than one, though, in case a server is down)."; + "than one, though, in case a server is down).";
public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
public static final String CONVERTER_CLASS_CONFIG = "converter"; public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
public static final String CONVERTER_CLASS_DOC = public static final String KEY_CONVERTER_CLASS_DOC =
"Converter class for Copycat data that implements the <code>Converter</code> interface."; "Converter class for key Copycat data that implements the <code>Converter</code> interface.";
public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
public static final String VALUE_CONVERTER_CLASS_DOC =
"Converter class for value Copycat data that implements the <code>Converter</code> interface.";
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String KEY_SERIALIZER_CLASS_DOC = public static final String KEY_SERIALIZER_CLASS_DOC =
@ -101,8 +105,10 @@ public class WorkerConfig extends AbstractConfig {
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC) .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT, .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
Importance.HIGH, BOOTSTRAP_SERVERS_DOC) Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
.define(CONVERTER_CLASS_CONFIG, Type.CLASS, .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, CONVERTER_CLASS_DOC) Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,

View File

@ -53,7 +53,8 @@ public class Worker {
private Time time; private Time time;
private WorkerConfig config; private WorkerConfig config;
private Converter converter; private Converter keyConverter;
private Converter valueConverter;
private OffsetBackingStore offsetBackingStore; private OffsetBackingStore offsetBackingStore;
private Serializer offsetKeySerializer; private Serializer offsetKeySerializer;
private Serializer offsetValueSerializer; private Serializer offsetValueSerializer;
@ -74,7 +75,8 @@ public class Worker {
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) { Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
this.time = time; this.time = time;
this.config = config; this.config = config;
this.converter = config.getConfiguredInstance(WorkerConfig.CONVERTER_CLASS_CONFIG, Converter.class); this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
this.offsetBackingStore = offsetBackingStore; this.offsetBackingStore = offsetBackingStore;
if (offsetKeySerializer != null) { if (offsetKeySerializer != null) {
@ -183,17 +185,14 @@ public class Worker {
final WorkerTask workerTask; final WorkerTask workerTask;
if (task instanceof SourceTask) { if (task instanceof SourceTask) {
SourceTask sourceTask = (SourceTask) task; SourceTask sourceTask = (SourceTask) task;
OffsetStorageReader offsetReader OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.getConnector(),
= new OffsetStorageReaderImpl(offsetBackingStore, id.getConnector(), converter, keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
offsetKeySerializer, offsetValueDeserializer); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.getConnector(),
OffsetStorageWriter offsetWriter keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
= new OffsetStorageWriter(offsetBackingStore, id.getConnector(), converter, workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
offsetKeySerializer, offsetValueSerializer); offsetReader, offsetWriter, config, time);
workerTask = new WorkerSourceTask(id, sourceTask, converter, producer,
offsetReader, offsetWriter,
config, time);
} else if (task instanceof SinkTask) { } else if (task instanceof SinkTask) {
workerTask = new WorkerSinkTask(id, (SinkTask) task, config, converter, time); workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
} else { } else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask"); throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");

View File

@ -45,17 +45,19 @@ public class WorkerSinkTask implements WorkerTask {
private final SinkTask task; private final SinkTask task;
private final WorkerConfig workerConfig; private final WorkerConfig workerConfig;
private final Time time; private final Time time;
private final Converter converter; private final Converter keyConverter;
private final Converter valueConverter;
private WorkerSinkTaskThread workThread; private WorkerSinkTaskThread workThread;
private KafkaConsumer<Object, Object> consumer; private KafkaConsumer<Object, Object> consumer;
private final SinkTaskContext context; private final SinkTaskContext context;
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
Converter converter, Time time) { Converter keyConverter, Converter valueConverter, Time time) {
this.id = id; this.id = id;
this.task = task; this.task = task;
this.workerConfig = workerConfig; this.workerConfig = workerConfig;
this.converter = converter; this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
context = new SinkTaskContextImpl(); context = new SinkTaskContextImpl();
this.time = time; this.time = time;
} }
@ -207,8 +209,8 @@ public class WorkerSinkTask implements WorkerTask {
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
records.add( records.add(
new SinkRecord(msg.topic(), msg.partition(), new SinkRecord(msg.topic(), msg.partition(),
converter.toCopycatData(msg.key()), keyConverter.toCopycatData(msg.key()),
converter.toCopycatData(msg.value()), valueConverter.toCopycatData(msg.value()),
msg.offset()) msg.offset())
); );
} }

View File

@ -51,7 +51,8 @@ public class WorkerSourceTask implements WorkerTask {
private ConnectorTaskId id; private ConnectorTaskId id;
private SourceTask task; private SourceTask task;
private final Converter converter; private final Converter keyConverter;
private final Converter valueConverter;
private KafkaProducer<Object, Object> producer; private KafkaProducer<Object, Object> producer;
private WorkerSourceTaskThread workThread; private WorkerSourceTaskThread workThread;
private OffsetStorageReader offsetReader; private OffsetStorageReader offsetReader;
@ -69,13 +70,14 @@ public class WorkerSourceTask implements WorkerTask {
private boolean flushing; private boolean flushing;
public WorkerSourceTask(ConnectorTaskId id, SourceTask task, public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
Converter converter, Converter keyConverter, Converter valueConverter,
KafkaProducer<Object, Object> producer, KafkaProducer<Object, Object> producer,
OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig, Time time) { WorkerConfig workerConfig, Time time) {
this.id = id; this.id = id;
this.task = task; this.task = task;
this.converter = converter; this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.producer = producer; this.producer = producer;
this.offsetReader = offsetReader; this.offsetReader = offsetReader;
this.offsetWriter = offsetWriter; this.offsetWriter = offsetWriter;
@ -132,8 +134,8 @@ public class WorkerSourceTask implements WorkerTask {
for (SourceRecord record : records) { for (SourceRecord record : records) {
final ProducerRecord<Object, Object> producerRecord final ProducerRecord<Object, Object> producerRecord
= new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(), = new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
converter.fromCopycatData(record.getKey()), keyConverter.fromCopycatData(record.getKey()),
converter.fromCopycatData(record.getValue())); valueConverter.fromCopycatData(record.getValue()));
log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue()); log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());
if (!flushing) { if (!flushing) {
outstandingMessages.put(producerRecord, producerRecord); outstandingMessages.put(producerRecord, producerRecord);

View File

@ -39,17 +39,19 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
private final OffsetBackingStore backingStore; private final OffsetBackingStore backingStore;
private final String namespace; private final String namespace;
private final Converter converter; private final Converter keyConverter;
private final Converter valueConverter;
private final Serializer keySerializer; private final Serializer keySerializer;
private final Deserializer valueDeserializer; private final Deserializer valueDeserializer;
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace, public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
Converter converter, Converter keyConverter, Converter valueConverter,
Serializer keySerializer, Serializer keySerializer,
Deserializer valueDeserializer) { Deserializer valueDeserializer) {
this.backingStore = backingStore; this.backingStore = backingStore;
this.namespace = namespace; this.namespace = namespace;
this.converter = converter; this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.valueDeserializer = valueDeserializer; this.valueDeserializer = valueDeserializer;
} }
@ -65,7 +67,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(partitions.size()); Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(partitions.size());
for (Object key : partitions) { for (Object key : partitions) {
try { try {
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key)); byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key));
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null; ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
serializedToOriginal.put(keyBuffer, key); serializedToOriginal.put(keyBuffer, key);
} catch (Throwable t) { } catch (Throwable t) {
@ -95,7 +97,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
continue; continue;
} }
Object origKey = serializedToOriginal.get(rawEntry.getKey()); Object origKey = serializedToOriginal.get(rawEntry.getKey());
Object deserializedValue = converter.toCopycatData( Object deserializedValue = valueConverter.toCopycatData(
valueDeserializer.deserialize(namespace, rawEntry.getValue().array()) valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
); );

View File

@ -67,7 +67,8 @@ public class OffsetStorageWriter {
private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class); private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
private final OffsetBackingStore backingStore; private final OffsetBackingStore backingStore;
private final Converter converter; private final Converter keyConverter;
private final Converter valueConverter;
private final Serializer keySerializer; private final Serializer keySerializer;
private final Serializer valueSerializer; private final Serializer valueSerializer;
private final String namespace; private final String namespace;
@ -79,11 +80,12 @@ public class OffsetStorageWriter {
private long currentFlushId = 0; private long currentFlushId = 0;
public OffsetStorageWriter(OffsetBackingStore backingStore, public OffsetStorageWriter(OffsetBackingStore backingStore,
String namespace, Converter converter, String namespace, Converter keyConverter, Converter valueConverter,
Serializer keySerializer, Serializer valueSerializer) { Serializer keySerializer, Serializer valueSerializer) {
this.backingStore = backingStore; this.backingStore = backingStore;
this.namespace = namespace; this.namespace = namespace;
this.converter = converter; this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.keySerializer = keySerializer; this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer; this.valueSerializer = valueSerializer;
} }
@ -134,9 +136,9 @@ public class OffsetStorageWriter {
try { try {
offsetsSerialized = new HashMap<>(); offsetsSerialized = new HashMap<>();
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) { for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey())); byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey()));
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null; ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
byte[] value = valueSerializer.serialize(namespace, converter.fromCopycatData(entry.getValue())); byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue()));
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null; ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
offsetsSerialized.put(keyBuffer, valueBuffer); offsetsSerialized.put(keyBuffer, valueBuffer);
} }

View File

@ -60,7 +60,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
private Time time; private Time time;
private SinkTask sinkTask; private SinkTask sinkTask;
private WorkerConfig workerConfig; private WorkerConfig workerConfig;
private Converter converter; private Converter keyConverter;
private Converter valueConverter;
private WorkerSinkTask workerTask; private WorkerSinkTask workerTask;
private KafkaConsumer<Object, Object> consumer; private KafkaConsumer<Object, Object> consumer;
private WorkerSinkTaskThread workerThread; private WorkerSinkTaskThread workerThread;
@ -73,16 +74,18 @@ public class WorkerSinkTaskTest extends ThreadedTest {
time = new MockTime(); time = new MockTime();
sinkTask = PowerMock.createMock(SinkTask.class); sinkTask = PowerMock.createMock(SinkTask.class);
Properties workerProps = new Properties(); Properties workerProps = new Properties();
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter"); workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerConfig = new WorkerConfig(workerProps); workerConfig = new WorkerConfig(workerProps);
converter = PowerMock.createMock(Converter.class); keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
workerTask = PowerMock.createPartialMock( workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
taskId, sinkTask, workerConfig, converter, time); taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
recordsReturned = 0; recordsReturned = 0;
} }
@ -135,9 +138,9 @@ public class WorkerSinkTaskTest extends ThreadedTest {
new ConsumerRecord<Object, Object>("topic", 0, 0, rawKey, rawValue)))); new ConsumerRecord<Object, Object>("topic", 0, 0, rawKey, rawValue))));
// Exact data doesn't matter, but should be passed directly to sink task // Exact data doesn't matter, but should be passed directly to sink task
EasyMock.expect(converter.toCopycatData(rawKey)) EasyMock.expect(keyConverter.toCopycatData(rawKey))
.andReturn(record); .andReturn(record);
EasyMock.expect(converter.toCopycatData(rawValue)) EasyMock.expect(valueConverter.toCopycatData(rawValue))
.andReturn(record); .andReturn(record);
Capture<Collection<SinkRecord>> capturedRecords Capture<Collection<SinkRecord>> capturedRecords
= EasyMock.newCapture(CaptureType.ALL); = EasyMock.newCapture(CaptureType.ALL);
@ -313,8 +316,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
return records; return records;
} }
}); });
EasyMock.expect(converter.toCopycatData(KEY)).andReturn(KEY).anyTimes(); EasyMock.expect(keyConverter.toCopycatData(KEY)).andReturn(KEY).anyTimes();
EasyMock.expect(converter.toCopycatData(VALUE)).andReturn(VALUE).anyTimes(); EasyMock.expect(valueConverter.toCopycatData(VALUE)).andReturn(VALUE).anyTimes();
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL); Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(capturedRecords)); sinkTask.put(EasyMock.capture(capturedRecords));
EasyMock.expectLastCall().anyTimes(); EasyMock.expectLastCall().anyTimes();

View File

@ -55,15 +55,18 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private static final byte[] PARTITION_BYTES = "partition".getBytes(); private static final byte[] PARTITION_BYTES = "partition".getBytes();
private static final byte[] OFFSET_BYTES = "offset-1".getBytes(); private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
private static final Integer KEY = -1;
private static final Integer RECORD = 12; private static final Integer RECORD = 12;
// The actual format of this data doesn't matter -- we just want to see that the right version // The actual format of this data doesn't matter -- we just want to see that the right version
// is used in the right place. // is used in the right place.
private static final String CONVERTED_KEY = "converted-key";
private static final String CONVERTED_RECORD = "converted-record"; private static final String CONVERTED_RECORD = "converted-record";
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private WorkerConfig config; private WorkerConfig config;
private SourceTask sourceTask; private SourceTask sourceTask;
private Converter converter; private Converter keyConverter;
private Converter valueConverter;
private KafkaProducer<Object, Object> producer; private KafkaProducer<Object, Object> producer;
private OffsetStorageReader offsetReader; private OffsetStorageReader offsetReader;
private OffsetStorageWriter offsetWriter; private OffsetStorageWriter offsetWriter;
@ -74,22 +77,23 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private static final Properties EMPTY_TASK_PROPS = new Properties(); private static final Properties EMPTY_TASK_PROPS = new Properties();
private static final List<SourceRecord> RECORDS = Arrays.asList( private static final List<SourceRecord> RECORDS = Arrays.asList(
new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD)
"topic", RECORD)
); );
@Override @Override
public void setup() { public void setup() {
super.setup(); super.setup();
Properties workerProps = new Properties(); Properties workerProps = new Properties();
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter"); workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
config = new WorkerConfig(workerProps); config = new WorkerConfig(workerProps);
sourceTask = PowerMock.createMock(SourceTask.class); sourceTask = PowerMock.createMock(SourceTask.class);
converter = PowerMock.createMock(Converter.class); keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
producer = PowerMock.createMock(KafkaProducer.class); producer = PowerMock.createMock(KafkaProducer.class);
offsetReader = PowerMock.createMock(OffsetStorageReader.class); offsetReader = PowerMock.createMock(OffsetStorageReader.class);
offsetWriter = PowerMock.createMock(OffsetStorageWriter.class); offsetWriter = PowerMock.createMock(OffsetStorageWriter.class);
@ -98,9 +102,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
} }
private void createWorkerTask() { private void createWorkerTask() {
workerTask = new WorkerSourceTask(taskId, sourceTask, converter, producer, workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, offsetReader, offsetWriter, config, new SystemTime());
config, new SystemTime());
} }
@Test @Test
@ -193,14 +196,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
List<SourceRecord> records = new ArrayList<>(); List<SourceRecord> records = new ArrayList<>();
// Can just use the same record for key and value // Can just use the same record for key and value
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD)); records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD));
Capture<ProducerRecord> sent = expectSendRecord(); Capture<ProducerRecord> sent = expectSendRecord();
PowerMock.replayAll(); PowerMock.replayAll();
Whitebox.invokeMethod(workerTask, "sendRecords", records); Whitebox.invokeMethod(workerTask, "sendRecords", records);
assertEquals(CONVERTED_RECORD, sent.getValue().key()); assertEquals(CONVERTED_KEY, sent.getValue().key());
assertEquals(CONVERTED_RECORD, sent.getValue().value()); assertEquals(CONVERTED_RECORD, sent.getValue().value());
PowerMock.verifyAll(); PowerMock.verifyAll();
@ -226,8 +229,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
} }
private Capture<ProducerRecord> expectSendRecord() throws InterruptedException { private Capture<ProducerRecord> expectSendRecord() throws InterruptedException {
EasyMock.expect(converter.fromCopycatData(null)).andStubReturn(null); EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
EasyMock.expect(converter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD); EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
Capture<ProducerRecord> sent = EasyMock.newCapture(); Capture<ProducerRecord> sent = EasyMock.newCapture();
// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work

View File

@ -60,7 +60,8 @@ public class WorkerTest extends ThreadedTest {
super.setup(); super.setup();
Properties workerProps = new Properties(); Properties workerProps = new Properties();
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter"); workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
@ -87,6 +88,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.expectNew( PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task), WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
EasyMock.anyObject(Converter.class), EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(KafkaProducer.class),
EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageReader.class),
EasyMock.anyObject(OffsetStorageWriter.class), EasyMock.anyObject(OffsetStorageWriter.class),
@ -132,6 +134,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.expectNew( PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task), WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
EasyMock.anyObject(Converter.class), EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(KafkaProducer.class),
EasyMock.anyObject(OffsetStorageReader.class), EasyMock.anyObject(OffsetStorageReader.class),
EasyMock.anyObject(OffsetStorageWriter.class), EasyMock.anyObject(OffsetStorageWriter.class),

View File

@ -49,7 +49,8 @@ public class OffsetStorageWriterTest {
ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED)); ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
private OffsetBackingStore store; private OffsetBackingStore store;
private Converter converter; private Converter keyConverter;
private Converter valueConverter;
private Serializer keySerializer; private Serializer keySerializer;
private Serializer valueSerializer; private Serializer valueSerializer;
private OffsetStorageWriter writer; private OffsetStorageWriter writer;
@ -61,10 +62,11 @@ public class OffsetStorageWriterTest {
@Before @Before
public void setup() { public void setup() {
store = PowerMock.createMock(OffsetBackingStore.class); store = PowerMock.createMock(OffsetBackingStore.class);
converter = PowerMock.createMock(Converter.class); keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
keySerializer = PowerMock.createMock(Serializer.class); keySerializer = PowerMock.createMock(Serializer.class);
valueSerializer = PowerMock.createMock(Serializer.class); valueSerializer = PowerMock.createMock(Serializer.class);
writer = new OffsetStorageWriter(store, NAMESPACE, converter, keySerializer, valueSerializer); writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
service = Executors.newFixedThreadPool(1); service = Executors.newFixedThreadPool(1);
} }
@ -193,9 +195,9 @@ public class OffsetStorageWriterTest {
private void expectStore(final Callback<Void> callback, private void expectStore(final Callback<Void> callback,
final boolean fail, final boolean fail,
final CountDownLatch waitForCompletion) { final CountDownLatch waitForCompletion) {
EasyMock.expect(converter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED); EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED); EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
EasyMock.expect(converter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED); EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED); EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
final Capture<Callback<Void>> storeCallback = Capture.newInstance(); final Capture<Callback<Void>> storeCallback = Capture.newInstance();