Split Copycat converter option into two options for key and value.

This commit is contained in:
Ewen Cheslack-Postava 2015-08-01 13:49:30 -07:00
parent 0b5a1a0c57
commit b194c7348f
12 changed files with 91 additions and 65 deletions

View File

@ -15,7 +15,8 @@
bootstrap.servers=localhost:9092
converter=org.apache.kafka.copycat.json.JsonConverter
key.converter=org.apache.kafka.copycat.json.JsonConverter
value.converter=org.apache.kafka.copycat.json.JsonConverter
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer

View File

@ -16,7 +16,8 @@
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
converter=org.apache.kafka.copycat.json.JsonConverter
key.converter=org.apache.kafka.copycat.json.JsonConverter
value.converter=org.apache.kafka.copycat.json.JsonConverter
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer

View File

@ -49,9 +49,13 @@ public class WorkerConfig extends AbstractConfig {
+ "than one, though, in case a server is down).";
public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
public static final String CONVERTER_CLASS_CONFIG = "converter";
public static final String CONVERTER_CLASS_DOC =
"Converter class for Copycat data that implements the <code>Converter</code> interface.";
public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
public static final String KEY_CONVERTER_CLASS_DOC =
"Converter class for key Copycat data that implements the <code>Converter</code> interface.";
public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
public static final String VALUE_CONVERTER_CLASS_DOC =
"Converter class for value Copycat data that implements the <code>Converter</code> interface.";
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String KEY_SERIALIZER_CLASS_DOC =
@ -101,8 +105,10 @@ public class WorkerConfig extends AbstractConfig {
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
.define(CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, CONVERTER_CLASS_DOC)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,

View File

@ -53,7 +53,8 @@ public class Worker {
private Time time;
private WorkerConfig config;
private Converter converter;
private Converter keyConverter;
private Converter valueConverter;
private OffsetBackingStore offsetBackingStore;
private Serializer offsetKeySerializer;
private Serializer offsetValueSerializer;
@ -74,7 +75,8 @@ public class Worker {
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
this.time = time;
this.config = config;
this.converter = config.getConfiguredInstance(WorkerConfig.CONVERTER_CLASS_CONFIG, Converter.class);
this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
this.offsetBackingStore = offsetBackingStore;
if (offsetKeySerializer != null) {
@ -183,17 +185,14 @@ public class Worker {
final WorkerTask workerTask;
if (task instanceof SourceTask) {
SourceTask sourceTask = (SourceTask) task;
OffsetStorageReader offsetReader
= new OffsetStorageReaderImpl(offsetBackingStore, id.getConnector(), converter,
offsetKeySerializer, offsetValueDeserializer);
OffsetStorageWriter offsetWriter
= new OffsetStorageWriter(offsetBackingStore, id.getConnector(), converter,
offsetKeySerializer, offsetValueSerializer);
workerTask = new WorkerSourceTask(id, sourceTask, converter, producer,
offsetReader, offsetWriter,
config, time);
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.getConnector(),
keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.getConnector(),
keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, time);
} else if (task instanceof SinkTask) {
workerTask = new WorkerSinkTask(id, (SinkTask) task, config, converter, time);
workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");

View File

@ -45,17 +45,19 @@ public class WorkerSinkTask implements WorkerTask {
private final SinkTask task;
private final WorkerConfig workerConfig;
private final Time time;
private final Converter converter;
private final Converter keyConverter;
private final Converter valueConverter;
private WorkerSinkTaskThread workThread;
private KafkaConsumer<Object, Object> consumer;
private final SinkTaskContext context;
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
Converter converter, Time time) {
Converter keyConverter, Converter valueConverter, Time time) {
this.id = id;
this.task = task;
this.workerConfig = workerConfig;
this.converter = converter;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
context = new SinkTaskContextImpl();
this.time = time;
}
@ -207,8 +209,8 @@ public class WorkerSinkTask implements WorkerTask {
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
records.add(
new SinkRecord(msg.topic(), msg.partition(),
converter.toCopycatData(msg.key()),
converter.toCopycatData(msg.value()),
keyConverter.toCopycatData(msg.key()),
valueConverter.toCopycatData(msg.value()),
msg.offset())
);
}

View File

@ -51,7 +51,8 @@ public class WorkerSourceTask implements WorkerTask {
private ConnectorTaskId id;
private SourceTask task;
private final Converter converter;
private final Converter keyConverter;
private final Converter valueConverter;
private KafkaProducer<Object, Object> producer;
private WorkerSourceTaskThread workThread;
private OffsetStorageReader offsetReader;
@ -69,13 +70,14 @@ public class WorkerSourceTask implements WorkerTask {
private boolean flushing;
public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
Converter converter,
Converter keyConverter, Converter valueConverter,
KafkaProducer<Object, Object> producer,
OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig, Time time) {
this.id = id;
this.task = task;
this.converter = converter;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.producer = producer;
this.offsetReader = offsetReader;
this.offsetWriter = offsetWriter;
@ -132,8 +134,8 @@ public class WorkerSourceTask implements WorkerTask {
for (SourceRecord record : records) {
final ProducerRecord<Object, Object> producerRecord
= new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
converter.fromCopycatData(record.getKey()),
converter.fromCopycatData(record.getValue()));
keyConverter.fromCopycatData(record.getKey()),
valueConverter.fromCopycatData(record.getValue()));
log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());
if (!flushing) {
outstandingMessages.put(producerRecord, producerRecord);

View File

@ -39,17 +39,19 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
private final OffsetBackingStore backingStore;
private final String namespace;
private final Converter converter;
private final Converter keyConverter;
private final Converter valueConverter;
private final Serializer keySerializer;
private final Deserializer valueDeserializer;
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
Converter converter,
Converter keyConverter, Converter valueConverter,
Serializer keySerializer,
Deserializer valueDeserializer) {
this.backingStore = backingStore;
this.namespace = namespace;
this.converter = converter;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.keySerializer = keySerializer;
this.valueDeserializer = valueDeserializer;
}
@ -65,7 +67,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(partitions.size());
for (Object key : partitions) {
try {
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key));
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
serializedToOriginal.put(keyBuffer, key);
} catch (Throwable t) {
@ -95,7 +97,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
continue;
}
Object origKey = serializedToOriginal.get(rawEntry.getKey());
Object deserializedValue = converter.toCopycatData(
Object deserializedValue = valueConverter.toCopycatData(
valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
);

View File

@ -67,7 +67,8 @@ public class OffsetStorageWriter {
private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
private final OffsetBackingStore backingStore;
private final Converter converter;
private final Converter keyConverter;
private final Converter valueConverter;
private final Serializer keySerializer;
private final Serializer valueSerializer;
private final String namespace;
@ -79,11 +80,12 @@ public class OffsetStorageWriter {
private long currentFlushId = 0;
public OffsetStorageWriter(OffsetBackingStore backingStore,
String namespace, Converter converter,
String namespace, Converter keyConverter, Converter valueConverter,
Serializer keySerializer, Serializer valueSerializer) {
this.backingStore = backingStore;
this.namespace = namespace;
this.converter = converter;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
}
@ -134,9 +136,9 @@ public class OffsetStorageWriter {
try {
offsetsSerialized = new HashMap<>();
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey()));
byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey()));
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
byte[] value = valueSerializer.serialize(namespace, converter.fromCopycatData(entry.getValue()));
byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue()));
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
offsetsSerialized.put(keyBuffer, valueBuffer);
}

View File

@ -60,7 +60,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
private Time time;
private SinkTask sinkTask;
private WorkerConfig workerConfig;
private Converter converter;
private Converter keyConverter;
private Converter valueConverter;
private WorkerSinkTask workerTask;
private KafkaConsumer<Object, Object> consumer;
private WorkerSinkTaskThread workerThread;
@ -73,16 +74,18 @@ public class WorkerSinkTaskTest extends ThreadedTest {
time = new MockTime();
sinkTask = PowerMock.createMock(SinkTask.class);
Properties workerProps = new Properties();
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerConfig = new WorkerConfig(workerProps);
converter = PowerMock.createMock(Converter.class);
keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
taskId, sinkTask, workerConfig, converter, time);
taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
recordsReturned = 0;
}
@ -135,9 +138,9 @@ public class WorkerSinkTaskTest extends ThreadedTest {
new ConsumerRecord<Object, Object>("topic", 0, 0, rawKey, rawValue))));
// Exact data doesn't matter, but should be passed directly to sink task
EasyMock.expect(converter.toCopycatData(rawKey))
EasyMock.expect(keyConverter.toCopycatData(rawKey))
.andReturn(record);
EasyMock.expect(converter.toCopycatData(rawValue))
EasyMock.expect(valueConverter.toCopycatData(rawValue))
.andReturn(record);
Capture<Collection<SinkRecord>> capturedRecords
= EasyMock.newCapture(CaptureType.ALL);
@ -313,8 +316,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
return records;
}
});
EasyMock.expect(converter.toCopycatData(KEY)).andReturn(KEY).anyTimes();
EasyMock.expect(converter.toCopycatData(VALUE)).andReturn(VALUE).anyTimes();
EasyMock.expect(keyConverter.toCopycatData(KEY)).andReturn(KEY).anyTimes();
EasyMock.expect(valueConverter.toCopycatData(VALUE)).andReturn(VALUE).anyTimes();
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
sinkTask.put(EasyMock.capture(capturedRecords));
EasyMock.expectLastCall().anyTimes();

View File

@ -55,15 +55,18 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private static final byte[] PARTITION_BYTES = "partition".getBytes();
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
private static final Integer KEY = -1;
private static final Integer RECORD = 12;
// The actual format of this data doesn't matter -- we just want to see that the right version
// is used in the right place.
private static final String CONVERTED_KEY = "converted-key";
private static final String CONVERTED_RECORD = "converted-record";
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private WorkerConfig config;
private SourceTask sourceTask;
private Converter converter;
private Converter keyConverter;
private Converter valueConverter;
private KafkaProducer<Object, Object> producer;
private OffsetStorageReader offsetReader;
private OffsetStorageWriter offsetWriter;
@ -74,22 +77,23 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private static final Properties EMPTY_TASK_PROPS = new Properties();
private static final List<SourceRecord> RECORDS = Arrays.asList(
new SourceRecord(PARTITION_BYTES, OFFSET_BYTES,
"topic", RECORD)
new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD)
);
@Override
public void setup() {
super.setup();
Properties workerProps = new Properties();
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
config = new WorkerConfig(workerProps);
sourceTask = PowerMock.createMock(SourceTask.class);
converter = PowerMock.createMock(Converter.class);
keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
producer = PowerMock.createMock(KafkaProducer.class);
offsetReader = PowerMock.createMock(OffsetStorageReader.class);
offsetWriter = PowerMock.createMock(OffsetStorageWriter.class);
@ -98,9 +102,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
private void createWorkerTask() {
workerTask = new WorkerSourceTask(taskId, sourceTask, converter, producer,
offsetReader, offsetWriter,
config, new SystemTime());
workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, new SystemTime());
}
@Test
@ -193,14 +196,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
List<SourceRecord> records = new ArrayList<>();
// Can just use the same record for key and value
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD));
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD));
Capture<ProducerRecord> sent = expectSendRecord();
PowerMock.replayAll();
Whitebox.invokeMethod(workerTask, "sendRecords", records);
assertEquals(CONVERTED_RECORD, sent.getValue().key());
assertEquals(CONVERTED_KEY, sent.getValue().key());
assertEquals(CONVERTED_RECORD, sent.getValue().value());
PowerMock.verifyAll();
@ -226,8 +229,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
private Capture<ProducerRecord> expectSendRecord() throws InterruptedException {
EasyMock.expect(converter.fromCopycatData(null)).andStubReturn(null);
EasyMock.expect(converter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
Capture<ProducerRecord> sent = EasyMock.newCapture();
// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work

View File

@ -60,7 +60,8 @@ public class WorkerTest extends ThreadedTest {
super.setup();
Properties workerProps = new Properties();
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
@ -87,6 +88,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(KafkaProducer.class),
EasyMock.anyObject(OffsetStorageReader.class),
EasyMock.anyObject(OffsetStorageWriter.class),
@ -132,6 +134,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.expectNew(
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(KafkaProducer.class),
EasyMock.anyObject(OffsetStorageReader.class),
EasyMock.anyObject(OffsetStorageWriter.class),

View File

@ -49,7 +49,8 @@ public class OffsetStorageWriterTest {
ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
private OffsetBackingStore store;
private Converter converter;
private Converter keyConverter;
private Converter valueConverter;
private Serializer keySerializer;
private Serializer valueSerializer;
private OffsetStorageWriter writer;
@ -61,10 +62,11 @@ public class OffsetStorageWriterTest {
@Before
public void setup() {
store = PowerMock.createMock(OffsetBackingStore.class);
converter = PowerMock.createMock(Converter.class);
keyConverter = PowerMock.createMock(Converter.class);
valueConverter = PowerMock.createMock(Converter.class);
keySerializer = PowerMock.createMock(Serializer.class);
valueSerializer = PowerMock.createMock(Serializer.class);
writer = new OffsetStorageWriter(store, NAMESPACE, converter, keySerializer, valueSerializer);
writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
service = Executors.newFixedThreadPool(1);
}
@ -193,9 +195,9 @@ public class OffsetStorageWriterTest {
private void expectStore(final Callback<Void> callback,
final boolean fail,
final CountDownLatch waitForCompletion) {
EasyMock.expect(converter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
EasyMock.expect(converter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
final Capture<Callback<Void>> storeCallback = Capture.newInstance();