mirror of https://github.com/apache/kafka.git
Split Copycat converter option into two options for key and value.
This commit is contained in:
parent
0b5a1a0c57
commit
b194c7348f
|
@ -15,7 +15,8 @@
|
|||
|
||||
bootstrap.servers=localhost:9092
|
||||
|
||||
converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
key.converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
value.converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
|
|
|
@ -16,7 +16,8 @@
|
|||
# These are defaults. This file just demonstrates how to override some settings.
|
||||
bootstrap.servers=localhost:9092
|
||||
|
||||
converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
key.converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
value.converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
|
|
|
@ -49,9 +49,13 @@ public class WorkerConfig extends AbstractConfig {
|
|||
+ "than one, though, in case a server is down).";
|
||||
public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
|
||||
|
||||
public static final String CONVERTER_CLASS_CONFIG = "converter";
|
||||
public static final String CONVERTER_CLASS_DOC =
|
||||
"Converter class for Copycat data that implements the <code>Converter</code> interface.";
|
||||
public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
|
||||
public static final String KEY_CONVERTER_CLASS_DOC =
|
||||
"Converter class for key Copycat data that implements the <code>Converter</code> interface.";
|
||||
|
||||
public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
|
||||
public static final String VALUE_CONVERTER_CLASS_DOC =
|
||||
"Converter class for value Copycat data that implements the <code>Converter</code> interface.";
|
||||
|
||||
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
|
||||
public static final String KEY_SERIALIZER_CLASS_DOC =
|
||||
|
@ -101,8 +105,10 @@ public class WorkerConfig extends AbstractConfig {
|
|||
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
|
||||
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
|
||||
Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
|
||||
.define(CONVERTER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, CONVERTER_CLASS_DOC)
|
||||
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
|
||||
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
|
||||
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
|
||||
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
|
|
|
@ -53,7 +53,8 @@ public class Worker {
|
|||
|
||||
private Time time;
|
||||
private WorkerConfig config;
|
||||
private Converter converter;
|
||||
private Converter keyConverter;
|
||||
private Converter valueConverter;
|
||||
private OffsetBackingStore offsetBackingStore;
|
||||
private Serializer offsetKeySerializer;
|
||||
private Serializer offsetValueSerializer;
|
||||
|
@ -74,7 +75,8 @@ public class Worker {
|
|||
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
|
||||
this.time = time;
|
||||
this.config = config;
|
||||
this.converter = config.getConfiguredInstance(WorkerConfig.CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
this.offsetBackingStore = offsetBackingStore;
|
||||
|
||||
if (offsetKeySerializer != null) {
|
||||
|
@ -183,17 +185,14 @@ public class Worker {
|
|||
final WorkerTask workerTask;
|
||||
if (task instanceof SourceTask) {
|
||||
SourceTask sourceTask = (SourceTask) task;
|
||||
OffsetStorageReader offsetReader
|
||||
= new OffsetStorageReaderImpl(offsetBackingStore, id.getConnector(), converter,
|
||||
offsetKeySerializer, offsetValueDeserializer);
|
||||
OffsetStorageWriter offsetWriter
|
||||
= new OffsetStorageWriter(offsetBackingStore, id.getConnector(), converter,
|
||||
offsetKeySerializer, offsetValueSerializer);
|
||||
workerTask = new WorkerSourceTask(id, sourceTask, converter, producer,
|
||||
offsetReader, offsetWriter,
|
||||
config, time);
|
||||
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.getConnector(),
|
||||
keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
|
||||
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.getConnector(),
|
||||
keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
|
||||
workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
|
||||
offsetReader, offsetWriter, config, time);
|
||||
} else if (task instanceof SinkTask) {
|
||||
workerTask = new WorkerSinkTask(id, (SinkTask) task, config, converter, time);
|
||||
workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
|
||||
} else {
|
||||
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
|
||||
throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");
|
||||
|
|
|
@ -45,17 +45,19 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
private final SinkTask task;
|
||||
private final WorkerConfig workerConfig;
|
||||
private final Time time;
|
||||
private final Converter converter;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private WorkerSinkTaskThread workThread;
|
||||
private KafkaConsumer<Object, Object> consumer;
|
||||
private final SinkTaskContext context;
|
||||
|
||||
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
|
||||
Converter converter, Time time) {
|
||||
Converter keyConverter, Converter valueConverter, Time time) {
|
||||
this.id = id;
|
||||
this.task = task;
|
||||
this.workerConfig = workerConfig;
|
||||
this.converter = converter;
|
||||
this.keyConverter = keyConverter;
|
||||
this.valueConverter = valueConverter;
|
||||
context = new SinkTaskContextImpl();
|
||||
this.time = time;
|
||||
}
|
||||
|
@ -207,8 +209,8 @@ public class WorkerSinkTask implements WorkerTask {
|
|||
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
|
||||
records.add(
|
||||
new SinkRecord(msg.topic(), msg.partition(),
|
||||
converter.toCopycatData(msg.key()),
|
||||
converter.toCopycatData(msg.value()),
|
||||
keyConverter.toCopycatData(msg.key()),
|
||||
valueConverter.toCopycatData(msg.value()),
|
||||
msg.offset())
|
||||
);
|
||||
}
|
||||
|
|
|
@ -51,7 +51,8 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
|
||||
private ConnectorTaskId id;
|
||||
private SourceTask task;
|
||||
private final Converter converter;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private KafkaProducer<Object, Object> producer;
|
||||
private WorkerSourceTaskThread workThread;
|
||||
private OffsetStorageReader offsetReader;
|
||||
|
@ -69,13 +70,14 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
private boolean flushing;
|
||||
|
||||
public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
|
||||
Converter converter,
|
||||
Converter keyConverter, Converter valueConverter,
|
||||
KafkaProducer<Object, Object> producer,
|
||||
OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter,
|
||||
WorkerConfig workerConfig, Time time) {
|
||||
this.id = id;
|
||||
this.task = task;
|
||||
this.converter = converter;
|
||||
this.keyConverter = keyConverter;
|
||||
this.valueConverter = valueConverter;
|
||||
this.producer = producer;
|
||||
this.offsetReader = offsetReader;
|
||||
this.offsetWriter = offsetWriter;
|
||||
|
@ -132,8 +134,8 @@ public class WorkerSourceTask implements WorkerTask {
|
|||
for (SourceRecord record : records) {
|
||||
final ProducerRecord<Object, Object> producerRecord
|
||||
= new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
|
||||
converter.fromCopycatData(record.getKey()),
|
||||
converter.fromCopycatData(record.getValue()));
|
||||
keyConverter.fromCopycatData(record.getKey()),
|
||||
valueConverter.fromCopycatData(record.getValue()));
|
||||
log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());
|
||||
if (!flushing) {
|
||||
outstandingMessages.put(producerRecord, producerRecord);
|
||||
|
|
|
@ -39,17 +39,19 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
|||
|
||||
private final OffsetBackingStore backingStore;
|
||||
private final String namespace;
|
||||
private final Converter converter;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private final Serializer keySerializer;
|
||||
private final Deserializer valueDeserializer;
|
||||
|
||||
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
|
||||
Converter converter,
|
||||
Converter keyConverter, Converter valueConverter,
|
||||
Serializer keySerializer,
|
||||
Deserializer valueDeserializer) {
|
||||
this.backingStore = backingStore;
|
||||
this.namespace = namespace;
|
||||
this.converter = converter;
|
||||
this.keyConverter = keyConverter;
|
||||
this.valueConverter = valueConverter;
|
||||
this.keySerializer = keySerializer;
|
||||
this.valueDeserializer = valueDeserializer;
|
||||
}
|
||||
|
@ -65,7 +67,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
|||
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(partitions.size());
|
||||
for (Object key : partitions) {
|
||||
try {
|
||||
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
|
||||
byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key));
|
||||
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
|
||||
serializedToOriginal.put(keyBuffer, key);
|
||||
} catch (Throwable t) {
|
||||
|
@ -95,7 +97,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
|||
continue;
|
||||
}
|
||||
Object origKey = serializedToOriginal.get(rawEntry.getKey());
|
||||
Object deserializedValue = converter.toCopycatData(
|
||||
Object deserializedValue = valueConverter.toCopycatData(
|
||||
valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
|
||||
);
|
||||
|
||||
|
|
|
@ -67,7 +67,8 @@ public class OffsetStorageWriter {
|
|||
private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
|
||||
|
||||
private final OffsetBackingStore backingStore;
|
||||
private final Converter converter;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private final Serializer keySerializer;
|
||||
private final Serializer valueSerializer;
|
||||
private final String namespace;
|
||||
|
@ -79,11 +80,12 @@ public class OffsetStorageWriter {
|
|||
private long currentFlushId = 0;
|
||||
|
||||
public OffsetStorageWriter(OffsetBackingStore backingStore,
|
||||
String namespace, Converter converter,
|
||||
String namespace, Converter keyConverter, Converter valueConverter,
|
||||
Serializer keySerializer, Serializer valueSerializer) {
|
||||
this.backingStore = backingStore;
|
||||
this.namespace = namespace;
|
||||
this.converter = converter;
|
||||
this.keyConverter = keyConverter;
|
||||
this.valueConverter = valueConverter;
|
||||
this.keySerializer = keySerializer;
|
||||
this.valueSerializer = valueSerializer;
|
||||
}
|
||||
|
@ -134,9 +136,9 @@ public class OffsetStorageWriter {
|
|||
try {
|
||||
offsetsSerialized = new HashMap<>();
|
||||
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
|
||||
byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey()));
|
||||
byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey()));
|
||||
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
|
||||
byte[] value = valueSerializer.serialize(namespace, converter.fromCopycatData(entry.getValue()));
|
||||
byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue()));
|
||||
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
|
||||
offsetsSerialized.put(keyBuffer, valueBuffer);
|
||||
}
|
||||
|
|
|
@ -60,7 +60,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
private Time time;
|
||||
private SinkTask sinkTask;
|
||||
private WorkerConfig workerConfig;
|
||||
private Converter converter;
|
||||
private Converter keyConverter;
|
||||
private Converter valueConverter;
|
||||
private WorkerSinkTask workerTask;
|
||||
private KafkaConsumer<Object, Object> consumer;
|
||||
private WorkerSinkTaskThread workerThread;
|
||||
|
@ -73,16 +74,18 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
time = new MockTime();
|
||||
sinkTask = PowerMock.createMock(SinkTask.class);
|
||||
Properties workerProps = new Properties();
|
||||
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
|
||||
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
|
||||
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
workerConfig = new WorkerConfig(workerProps);
|
||||
converter = PowerMock.createMock(Converter.class);
|
||||
keyConverter = PowerMock.createMock(Converter.class);
|
||||
valueConverter = PowerMock.createMock(Converter.class);
|
||||
workerTask = PowerMock.createPartialMock(
|
||||
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
|
||||
taskId, sinkTask, workerConfig, converter, time);
|
||||
taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
|
||||
|
||||
recordsReturned = 0;
|
||||
}
|
||||
|
@ -135,9 +138,9 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
new ConsumerRecord<Object, Object>("topic", 0, 0, rawKey, rawValue))));
|
||||
|
||||
// Exact data doesn't matter, but should be passed directly to sink task
|
||||
EasyMock.expect(converter.toCopycatData(rawKey))
|
||||
EasyMock.expect(keyConverter.toCopycatData(rawKey))
|
||||
.andReturn(record);
|
||||
EasyMock.expect(converter.toCopycatData(rawValue))
|
||||
EasyMock.expect(valueConverter.toCopycatData(rawValue))
|
||||
.andReturn(record);
|
||||
Capture<Collection<SinkRecord>> capturedRecords
|
||||
= EasyMock.newCapture(CaptureType.ALL);
|
||||
|
@ -313,8 +316,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
return records;
|
||||
}
|
||||
});
|
||||
EasyMock.expect(converter.toCopycatData(KEY)).andReturn(KEY).anyTimes();
|
||||
EasyMock.expect(converter.toCopycatData(VALUE)).andReturn(VALUE).anyTimes();
|
||||
EasyMock.expect(keyConverter.toCopycatData(KEY)).andReturn(KEY).anyTimes();
|
||||
EasyMock.expect(valueConverter.toCopycatData(VALUE)).andReturn(VALUE).anyTimes();
|
||||
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
|
||||
sinkTask.put(EasyMock.capture(capturedRecords));
|
||||
EasyMock.expectLastCall().anyTimes();
|
||||
|
|
|
@ -55,15 +55,18 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
private static final byte[] PARTITION_BYTES = "partition".getBytes();
|
||||
private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
|
||||
|
||||
private static final Integer KEY = -1;
|
||||
private static final Integer RECORD = 12;
|
||||
// The actual format of this data doesn't matter -- we just want to see that the right version
|
||||
// is used in the right place.
|
||||
private static final String CONVERTED_KEY = "converted-key";
|
||||
private static final String CONVERTED_RECORD = "converted-record";
|
||||
|
||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||
private WorkerConfig config;
|
||||
private SourceTask sourceTask;
|
||||
private Converter converter;
|
||||
private Converter keyConverter;
|
||||
private Converter valueConverter;
|
||||
private KafkaProducer<Object, Object> producer;
|
||||
private OffsetStorageReader offsetReader;
|
||||
private OffsetStorageWriter offsetWriter;
|
||||
|
@ -74,22 +77,23 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
|
||||
private static final Properties EMPTY_TASK_PROPS = new Properties();
|
||||
private static final List<SourceRecord> RECORDS = Arrays.asList(
|
||||
new SourceRecord(PARTITION_BYTES, OFFSET_BYTES,
|
||||
"topic", RECORD)
|
||||
new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD)
|
||||
);
|
||||
|
||||
@Override
|
||||
public void setup() {
|
||||
super.setup();
|
||||
Properties workerProps = new Properties();
|
||||
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
|
||||
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
|
||||
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
config = new WorkerConfig(workerProps);
|
||||
sourceTask = PowerMock.createMock(SourceTask.class);
|
||||
converter = PowerMock.createMock(Converter.class);
|
||||
keyConverter = PowerMock.createMock(Converter.class);
|
||||
valueConverter = PowerMock.createMock(Converter.class);
|
||||
producer = PowerMock.createMock(KafkaProducer.class);
|
||||
offsetReader = PowerMock.createMock(OffsetStorageReader.class);
|
||||
offsetWriter = PowerMock.createMock(OffsetStorageWriter.class);
|
||||
|
@ -98,9 +102,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
}
|
||||
|
||||
private void createWorkerTask() {
|
||||
workerTask = new WorkerSourceTask(taskId, sourceTask, converter, producer,
|
||||
offsetReader, offsetWriter,
|
||||
config, new SystemTime());
|
||||
workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
|
||||
offsetReader, offsetWriter, config, new SystemTime());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -193,14 +196,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
|
||||
List<SourceRecord> records = new ArrayList<>();
|
||||
// Can just use the same record for key and value
|
||||
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, RECORD, RECORD));
|
||||
records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD));
|
||||
|
||||
Capture<ProducerRecord> sent = expectSendRecord();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
Whitebox.invokeMethod(workerTask, "sendRecords", records);
|
||||
assertEquals(CONVERTED_RECORD, sent.getValue().key());
|
||||
assertEquals(CONVERTED_KEY, sent.getValue().key());
|
||||
assertEquals(CONVERTED_RECORD, sent.getValue().value());
|
||||
|
||||
PowerMock.verifyAll();
|
||||
|
@ -226,8 +229,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
}
|
||||
|
||||
private Capture<ProducerRecord> expectSendRecord() throws InterruptedException {
|
||||
EasyMock.expect(converter.fromCopycatData(null)).andStubReturn(null);
|
||||
EasyMock.expect(converter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
|
||||
EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
|
||||
EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
|
||||
|
||||
Capture<ProducerRecord> sent = EasyMock.newCapture();
|
||||
// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work
|
||||
|
|
|
@ -60,7 +60,8 @@ public class WorkerTest extends ThreadedTest {
|
|||
super.setup();
|
||||
|
||||
Properties workerProps = new Properties();
|
||||
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
|
||||
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
|
||||
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
|
@ -87,6 +88,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
PowerMock.expectNew(
|
||||
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
|
||||
EasyMock.anyObject(Converter.class),
|
||||
EasyMock.anyObject(Converter.class),
|
||||
EasyMock.anyObject(KafkaProducer.class),
|
||||
EasyMock.anyObject(OffsetStorageReader.class),
|
||||
EasyMock.anyObject(OffsetStorageWriter.class),
|
||||
|
@ -132,6 +134,7 @@ public class WorkerTest extends ThreadedTest {
|
|||
PowerMock.expectNew(
|
||||
WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
|
||||
EasyMock.anyObject(Converter.class),
|
||||
EasyMock.anyObject(Converter.class),
|
||||
EasyMock.anyObject(KafkaProducer.class),
|
||||
EasyMock.anyObject(OffsetStorageReader.class),
|
||||
EasyMock.anyObject(OffsetStorageWriter.class),
|
||||
|
|
|
@ -49,7 +49,8 @@ public class OffsetStorageWriterTest {
|
|||
ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
|
||||
|
||||
private OffsetBackingStore store;
|
||||
private Converter converter;
|
||||
private Converter keyConverter;
|
||||
private Converter valueConverter;
|
||||
private Serializer keySerializer;
|
||||
private Serializer valueSerializer;
|
||||
private OffsetStorageWriter writer;
|
||||
|
@ -61,10 +62,11 @@ public class OffsetStorageWriterTest {
|
|||
@Before
|
||||
public void setup() {
|
||||
store = PowerMock.createMock(OffsetBackingStore.class);
|
||||
converter = PowerMock.createMock(Converter.class);
|
||||
keyConverter = PowerMock.createMock(Converter.class);
|
||||
valueConverter = PowerMock.createMock(Converter.class);
|
||||
keySerializer = PowerMock.createMock(Serializer.class);
|
||||
valueSerializer = PowerMock.createMock(Serializer.class);
|
||||
writer = new OffsetStorageWriter(store, NAMESPACE, converter, keySerializer, valueSerializer);
|
||||
writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
|
||||
|
||||
service = Executors.newFixedThreadPool(1);
|
||||
}
|
||||
|
@ -193,9 +195,9 @@ public class OffsetStorageWriterTest {
|
|||
private void expectStore(final Callback<Void> callback,
|
||||
final boolean fail,
|
||||
final CountDownLatch waitForCompletion) {
|
||||
EasyMock.expect(converter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
|
||||
EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
|
||||
EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
|
||||
EasyMock.expect(converter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
|
||||
EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
|
||||
EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
|
||||
|
||||
final Capture<Callback<Void>> storeCallback = Capture.newInstance();
|
||||
|
|
Loading…
Reference in New Issue