Remove offset serializers, instead reusing the existing serializers and removing schema projection support.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-29 01:28:49 -07:00
parent e849e10c7e
commit 5a618c6815
18 changed files with 91 additions and 334 deletions

View File

@ -145,6 +145,7 @@
<subpackage name="storage">
<allow pkg="org.apache.kafka.copycat" />
<allow pkg="org.apache.kafka.common.serialization" />
<!-- for tests -->
<allow pkg="org.easymock" />
<allow pkg="org.powermock" />

View File

@ -16,6 +16,13 @@
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081
# TODO: Non-avro built-ins?
converter=org.apache.kafka.copycat.avro.AvroConverter
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
offset.storage.file.filename=/tmp/copycat.offsets
# Flush much faster than normal, which is useful for testing/debugging

View File

@ -15,4 +15,11 @@
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081
schema.registry.url=http://localhost:8081
# TODO: Non-avro built-ins?
converter=org.apache.kafka.copycat.avro.AvroConverter
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

View File

@ -1,58 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.Schema;
import java.io.Closeable;
import java.util.Map;
/**
* Deserializer for Copycat offsets.
* @param <T>
*/
public interface OffsetDeserializer<T> extends Closeable {
/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
public void configure(Map<String, ?> configs, boolean isKey);
/**
* Deserialize an offset key or value from the specified connector.
* @param connector connector associated with the data
* @param data serialized bytes
* @return deserialized typed data
*/
public T deserializeOffset(String connector, byte[] data);
/**
* Deserialize an offset key or value from the specified connector using a schema.
* @param connector connector associated with the data
* @param data serialized bytes
* @param schema schema to deserialize to
* @return deserialized typed data
*/
public T deserializeOffset(String connector, byte[] data, Schema schema);
@Override
public void close();
}

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.storage;
import java.io.Closeable;
import java.util.Map;
/**
* Serializer for Copycat offsets.
* @param <T> native type of offsets.
*/
public interface OffsetSerializer<T> extends Closeable {
/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
public void configure(Map<String, ?> configs, boolean isKey);
/**
* @param connector the connector associated with offsets
* @param data typed data
* @return serialized bytes
*/
public byte[] serializeOffset(String connector, T data);
/**
* Close this serializer.
*/
@Override
public void close();
}

View File

@ -17,8 +17,6 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.Schema;
import java.util.Collection;
import java.util.Map;
@ -33,15 +31,14 @@ public interface OffsetStorageReader {
* gets it from the backing store, which may require some network round trips.
*
* @param stream object uniquely identifying the stream of data
* @param schema schema used to decode the offset value
* @return object uniquely identifying the offset in the stream of data
*/
public Object getOffset(Object stream, Schema schema);
public Object getOffset(Object stream);
/**
* <p>
* Get a set of offsets for the specified stream identifiers. This may be more efficient
* than calling {@link #getOffset(Object, Schema)} repeatedly.
* than calling {@link #getOffset(Object)} repeatedly.
* </p>
* <p>
* Note that when errors occur, this method omits the associated data and tries to return as
@ -53,8 +50,7 @@ public interface OffsetStorageReader {
* </p>
*
* @param streams set of identifiers for streams of data
* @param schema schema used to decode offset values
* @return a map of stream identifiers to decoded offsets
*/
public Map<Object, Object> getOffsets(Collection<Object> streams, Schema schema);
public Map<Object, Object> getOffsets(Collection<Object> streams);
}

View File

@ -1,50 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.avro;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.storage.OffsetDeserializer;
import java.util.Map;
public class AvroDeserializer extends AbstractKafkaAvroDeserializer implements OffsetDeserializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public Object deserializeOffset(String connector, byte[] data) {
// TODO: Support schema projection
return deserialize(data);
}
@Override
public Object deserializeOffset(String connector, byte[] data, Schema schema) {
// TODO: Support schema projection
return deserialize(data);
}
@Override
public void close() {
}
}

View File

@ -1,66 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.avro;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.storage.OffsetSerializer;
import java.util.Map;
public class AvroSerializer extends AbstractKafkaAvroSerializer implements OffsetSerializer<Object> {
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
Object url = configs.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
if (url == null) {
throw new CopycatRuntimeException("Missing Schema registry url!");
}
Object maxSchemaObject = configs.get(
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG);
if (maxSchemaObject == null) {
schemaRegistry = new CachedSchemaRegistryClient(
(String) url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
} else {
schemaRegistry = new CachedSchemaRegistryClient(
(String) url, (Integer) maxSchemaObject);
}
}
@Override
public byte[] serializeOffset(String connector, Object data) {
String subject;
if (isKey) {
subject = connector + "-key";
} else {
subject = connector + "-value";
}
return serializeImpl(subject, data);
}
@Override
public void close() {
}
}

View File

@ -17,8 +17,6 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.source.SourceRecord;
@ -55,9 +53,7 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
} else {
try {
stream = new FileInputStream(filename);
Schema longSchema = SchemaBuilder.builder().longType();
Long lastRecordedOffset
= (Long) context.getOffsetStorageReader().getOffset(null, longSchema);
Long lastRecordedOffset = (Long) context.getOffsetStorageReader().getOffset(null);
if (lastRecordedOffset != null) {
log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
long skipLeft = lastRecordedOffset;

View File

@ -17,7 +17,6 @@
package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.source.SourceRecord;
@ -137,8 +136,7 @@ public class FileStreamSourceTaskTest {
private void expectOffsetLookupReturnNone() {
EasyMock.expect(
offsetStorageReader.getOffset(
EasyMock.anyObject(Object.class), EasyMock.anyObject(Schema.class)))
offsetStorageReader.getOffset(EasyMock.anyObject(Object.class)))
.andReturn(null);
}
}

View File

@ -52,58 +52,22 @@ public class WorkerConfig extends AbstractConfig {
public static final String CONVERTER_CLASS_CONFIG = "converter";
public static final String CONVERTER_CLASS_DOC =
"Converter class for Copycat data that implements the <code>Converter</code> interface.";
public static final String CONVERTER_CLASS_DEFAULT
= "org.apache.kafka.copycat.avro.AvroConverter"; // TODO: Non-avro built-in?
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String KEY_SERIALIZER_CLASS_DOC =
"Serializer class for key that implements the <code>Serializer</code> interface.";
public static final String KEY_SERIALIZER_CLASS_DEFAULT
= "io.confluent.kafka.serializers.KafkaAvroSerializer"; // TODO: Non-avro built-in?
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String VALUE_SERIALIZER_CLASS_DOC =
"Serializer class for value that implements the <code>Serializer</code> interface.";
public static final String VALUE_SERIALIZER_CLASS_DEFAULT
= "io.confluent.kafka.serializers.KafkaAvroSerializer"; // TODO: Non-avro built-in?
public static final String OFFSET_KEY_SERIALIZER_CLASS_CONFIG = "offset.key.serializer";
public static final String OFFSET_KEY_SERIALIZER_CLASS_DOC =
"Serializer class for key that implements the <code>OffsetSerializer</code> interface.";
public static final String OFFSET_KEY_SERIALIZER_CLASS_DEFAULT
= "org.apache.kafka.copycat.avro.AvroSerializer"; // TODO: Non-avro built-in?
public static final String OFFSET_VALUE_SERIALIZER_CLASS_CONFIG = "offset.value.serializer";
public static final String OFFSET_VALUE_SERIALIZER_CLASS_DOC =
"Serializer class for value that implements the <code>OffsetSerializer</code> interface.";
public static final String OFFSET_VALUE_SERIALIZER_CLASS_DEFAULT
= "org.apache.kafka.copycat.avro.AvroSerializer"; // TODO: Non-avro built-in?
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String KEY_DESERIALIZER_CLASS_DOC =
"Serializer class for key that implements the <code>Deserializer</code> interface.";
public static final String KEY_DESERIALIZER_CLASS_DEFAULT
= "io.confluent.kafka.serializers.KafkaAvroDeserializer"; // TODO: Non-avro built-in?
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_DOC =
"Deserializer class for value that implements the <code>Deserializer</code> interface.";
public static final String VALUE_DESERIALIZER_CLASS_DEFAULT
= "io.confluent.kafka.serializers.KafkaAvroDeserializer"; // TODO: Non-avro built-in?
public static final String OFFSET_KEY_DESERIALIZER_CLASS_CONFIG = "offset.key.deserializer";
public static final String OFFSET_KEY_DESERIALIZER_CLASS_DOC =
"Deserializer class for key that implements the <code>OffsetDeserializer</code> interface.";
public static final String OFFSET_KEY_DESERIALIZER_CLASS_DEFAULT
= "org.apache.kafka.copycat.avro.AvroDeserializer"; // TODO: Non-avro built-in?
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG = "offset.value.deserializer";
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_DOC =
"Deserializer class for value that implements the <code>OffsetDeserializer</code> interface.";
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_DEFAULT
= "org.apache.kafka.copycat.avro.AvroDeserializer"; // TODO: Non-avro built-in?
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
= "task.shutdown.graceful.timeout.ms";
@ -137,27 +101,16 @@ public class WorkerConfig extends AbstractConfig {
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
Importance.HIGH, BOOSTRAP_SERVERS_DOC)
.define(CONVERTER_CLASS_CONFIG, Type.CLASS, CONVERTER_CLASS_DEFAULT,
.define(CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, CONVERTER_CLASS_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, KEY_SERIALIZER_CLASS_DEFAULT,
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, VALUE_SERIALIZER_CLASS_DEFAULT,
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
.define(OFFSET_KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, OFFSET_KEY_SERIALIZER_CLASS_DEFAULT,
Importance.HIGH, OFFSET_KEY_SERIALIZER_CLASS_DOC)
.define(OFFSET_VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
OFFSET_VALUE_SERIALIZER_CLASS_DEFAULT,
Importance.HIGH, OFFSET_VALUE_SERIALIZER_CLASS_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, KEY_DESERIALIZER_CLASS_DEFAULT,
.define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
.define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, VALUE_DESERIALIZER_CLASS_DEFAULT,
.define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC)
.define(OFFSET_KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
OFFSET_KEY_DESERIALIZER_CLASS_DEFAULT,
Importance.HIGH, OFFSET_KEY_DESERIALIZER_CLASS_DOC)
.define(OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
OFFSET_VALUE_DESERIALIZER_CLASS_DEFAULT,
Importance.HIGH, OFFSET_VALUE_DESERIALIZER_CLASS_DOC)
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)

View File

@ -17,6 +17,8 @@
package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
@ -54,10 +56,10 @@ public class Worker {
private WorkerConfig config;
private Converter converter;
private OffsetBackingStore offsetBackingStore;
private OffsetSerializer offsetKeySerializer;
private OffsetSerializer offsetValueSerializer;
private OffsetDeserializer offsetKeyDeserializer;
private OffsetDeserializer offsetValueDeserializer;
private Serializer offsetKeySerializer;
private Serializer offsetValueSerializer;
private Deserializer offsetKeyDeserializer;
private Deserializer offsetValueDeserializer;
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
private KafkaProducer producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
@ -71,8 +73,8 @@ public class Worker {
}
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
OffsetSerializer offsetKeySerializer, OffsetSerializer offsetValueSerializer,
OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer) {
Serializer offsetKeySerializer, Serializer offsetValueSerializer,
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
this.time = time;
this.config = config;
this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(),
@ -83,8 +85,7 @@ public class Worker {
this.offsetKeySerializer = offsetKeySerializer;
} else {
this.offsetKeySerializer = Reflection.instantiate(
config.getClass(WorkerConfig.OFFSET_KEY_SERIALIZER_CLASS_CONFIG).getName(),
OffsetSerializer.class);
config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName(), Serializer.class);
this.offsetKeySerializer.configure(config.getOriginalProperties(), true);
}
@ -92,8 +93,7 @@ public class Worker {
this.offsetValueSerializer = offsetValueSerializer;
} else {
this.offsetValueSerializer = Reflection.instantiate(
config.getClass(WorkerConfig.OFFSET_VALUE_SERIALIZER_CLASS_CONFIG).getName(),
OffsetSerializer.class);
config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName(), Serializer.class);
this.offsetValueSerializer.configure(config.getOriginalProperties(), false);
}
@ -101,8 +101,7 @@ public class Worker {
this.offsetKeyDeserializer = offsetKeyDeserializer;
} else {
this.offsetKeyDeserializer = Reflection.instantiate(
config.getClass(WorkerConfig.OFFSET_KEY_DESERIALIZER_CLASS_CONFIG).getName(),
OffsetDeserializer.class);
config.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName(), Deserializer.class);
this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true);
}
@ -110,8 +109,7 @@ public class Worker {
this.offsetValueDeserializer = offsetValueDeserializer;
} else {
this.offsetValueDeserializer = Reflection.instantiate(
config.getClass(WorkerConfig.OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG).getName(),
OffsetDeserializer.class);
config.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName(), Deserializer.class);
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
}
}

View File

@ -17,7 +17,8 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,13 +40,13 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
private final OffsetBackingStore backingStore;
private final String namespace;
private final Converter converter;
private final OffsetSerializer keySerializer;
private final OffsetDeserializer valueDeserializer;
private final Serializer keySerializer;
private final Deserializer valueDeserializer;
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
Converter converter,
OffsetSerializer keySerializer,
OffsetDeserializer valueDeserializer) {
Serializer keySerializer,
Deserializer valueDeserializer) {
this.backingStore = backingStore;
this.namespace = namespace;
this.converter = converter;
@ -54,18 +55,17 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
}
@Override
public Object getOffset(Object stream, Schema schema) {
return getOffsets(Arrays.asList(stream), schema).get(stream);
public Object getOffset(Object stream) {
return getOffsets(Arrays.asList(stream)).get(stream);
}
@Override
public Map<Object, Object> getOffsets(Collection<Object> streams, Schema schema) {
public Map<Object, Object> getOffsets(Collection<Object> streams) {
// Serialize keys so backing store can work with them
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<ByteBuffer, Object>(streams.size());
for (Object key : streams) {
try {
byte[] keySerialized = keySerializer.serializeOffset(namespace,
converter.fromCopycatData(key));
byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
serializedToOriginal.put(keyBuffer, key);
} catch (Throwable t) {
@ -96,7 +96,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
}
Object origKey = serializedToOriginal.get(rawEntry.getKey());
Object deserializedValue = converter.toCopycatData(
valueDeserializer.deserializeOffset(namespace, rawEntry.getValue().array(), schema)
valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
);
result.put(origKey, deserializedValue);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.util.Callback;
import org.slf4j.Logger;
@ -67,8 +68,8 @@ public class OffsetStorageWriter {
private final OffsetBackingStore backingStore;
private final Converter converter;
private final OffsetSerializer keySerializer;
private final OffsetSerializer valueSerializer;
private final Serializer keySerializer;
private final Serializer valueSerializer;
private final String namespace;
private Map<Object, Object> data = new HashMap<Object, Object>();
@ -79,7 +80,7 @@ public class OffsetStorageWriter {
public OffsetStorageWriter(OffsetBackingStore backingStore,
String namespace, Converter converter,
OffsetSerializer keySerializer, OffsetSerializer valueSerializer) {
Serializer keySerializer, Serializer valueSerializer) {
this.backingStore = backingStore;
this.namespace = namespace;
this.converter = converter;
@ -134,11 +135,9 @@ public class OffsetStorageWriter {
try {
offsetsSerialized = new HashMap<ByteBuffer, ByteBuffer>();
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
byte[] key = keySerializer.serializeOffset(namespace,
converter.fromCopycatData(entry.getKey()));
byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey()));
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
byte[] value = valueSerializer.serializeOffset(namespace,
converter.fromCopycatData(entry.getValue()));
byte[] value = valueSerializer.serialize(namespace, converter.fromCopycatData(entry.getValue()));
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
offsetsSerialized.put(keyBuffer, valueBuffer);
}

View File

@ -76,7 +76,14 @@ public class WorkerSinkTaskTest extends ThreadedTest {
super.setup();
time = new MockTime();
sinkTask = PowerMock.createMock(SinkTask.class);
workerConfig = new WorkerConfig();
Properties workerProps = new Properties();
// TODO: Non-avro built-ins?
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerConfig = new WorkerConfig(workerProps);
converter = PowerMock.createMock(Converter.class);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},

View File

@ -66,7 +66,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private static final String CONVERTED_RECORD = "converted-record";
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private WorkerConfig config = new WorkerConfig();
private WorkerConfig config;
private SourceTask sourceTask;
private Converter converter;
private KafkaProducer<Object, Object> producer;
@ -86,6 +86,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Override
public void setup() {
super.setup();
Properties workerProps = new Properties();
// TODO: Non-avro built-ins?
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
config = new WorkerConfig(workerProps);
sourceTask = PowerMock.createMock(SourceTask.class);
converter = PowerMock.createMock(Converter.class);
producer = PowerMock.createMock(KafkaProducer.class);

View File

@ -17,6 +17,8 @@
package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.copycat.cli.WorkerConfig;
@ -48,18 +50,24 @@ public class WorkerTest extends ThreadedTest {
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private Worker worker;
private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
private OffsetSerializer offsetKeySerializer = PowerMock.createMock(OffsetSerializer.class);
private OffsetSerializer offsetValueSerializer = PowerMock.createMock(OffsetSerializer.class);
private OffsetDeserializer offsetKeyDeserializer = PowerMock.createMock(OffsetDeserializer.class);
private OffsetDeserializer offsetValueDeserializer = PowerMock.createMock(OffsetDeserializer.class);
private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class);
private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class);
private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class);
private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class);
@Before
public void setup() {
super.setup();
// TODO: Remove schema registry URL
// TODO: Non-avro built-ins?
Properties workerProps = new Properties();
workerProps.setProperty("schema.registry.url", "http://localhost:8081");
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
WorkerConfig config = new WorkerConfig(workerProps);
worker = new Worker(new MockTime(), config, offsetBackingStore,
offsetKeySerializer, offsetValueSerializer,

View File

@ -17,6 +17,7 @@
package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.util.Callback;
import org.easymock.Capture;
@ -49,8 +50,8 @@ public class OffsetStorageWriterTest {
private OffsetBackingStore store;
private Converter converter;
private OffsetSerializer keySerializer;
private OffsetSerializer valueSerializer;
private Serializer keySerializer;
private Serializer valueSerializer;
private OffsetStorageWriter writer;
private static Exception exception = new RuntimeException("error");
@ -61,8 +62,8 @@ public class OffsetStorageWriterTest {
public void setup() {
store = PowerMock.createMock(OffsetBackingStore.class);
converter = PowerMock.createMock(Converter.class);
keySerializer = PowerMock.createMock(OffsetSerializer.class);
valueSerializer = PowerMock.createMock(OffsetSerializer.class);
keySerializer = PowerMock.createMock(Serializer.class);
valueSerializer = PowerMock.createMock(Serializer.class);
writer = new OffsetStorageWriter(store, NAMESPACE, converter, keySerializer, valueSerializer);
service = Executors.newFixedThreadPool(1);
@ -193,9 +194,9 @@ public class OffsetStorageWriterTest {
final boolean fail,
final CountDownLatch waitForCompletion) {
EasyMock.expect(converter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
EasyMock.expect(keySerializer.serializeOffset(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
EasyMock.expect(converter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
EasyMock.expect(valueSerializer.serializeOffset(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
final Capture<Callback<Void>> storeCallback = Capture.newInstance();
EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),