Remove offset serializers, instead reusing the existing serializers and removing schema projection support.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-29 01:28:49 -07:00
parent e849e10c7e
commit 5a618c6815
18 changed files with 91 additions and 334 deletions

View File

@ -145,6 +145,7 @@
<subpackage name="storage"> <subpackage name="storage">
<allow pkg="org.apache.kafka.copycat" /> <allow pkg="org.apache.kafka.copycat" />
<allow pkg="org.apache.kafka.common.serialization" />
<!-- for tests --> <!-- for tests -->
<allow pkg="org.easymock" /> <allow pkg="org.easymock" />
<allow pkg="org.powermock" /> <allow pkg="org.powermock" />

View File

@ -16,6 +16,13 @@
bootstrap.servers=localhost:9092 bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081 schema.registry.url=http://localhost:8081
# TODO: Non-avro built-ins?
converter=org.apache.kafka.copycat.avro.AvroConverter
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
offset.storage.file.filename=/tmp/copycat.offsets offset.storage.file.filename=/tmp/copycat.offsets
# Flush much faster than normal, which is useful for testing/debugging # Flush much faster than normal, which is useful for testing/debugging

View File

@ -15,4 +15,11 @@
# These are defaults. This file just demonstrates how to override some settings. # These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092 bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081 schema.registry.url=http://localhost:8081
# TODO: Non-avro built-ins?
converter=org.apache.kafka.copycat.avro.AvroConverter
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer

View File

@ -1,58 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.Schema;
import java.io.Closeable;
import java.util.Map;
/**
* Deserializer for Copycat offsets.
* @param <T>
*/
public interface OffsetDeserializer<T> extends Closeable {
/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
public void configure(Map<String, ?> configs, boolean isKey);
/**
* Deserialize an offset key or value from the specified connector.
* @param connector connector associated with the data
* @param data serialized bytes
* @return deserialized typed data
*/
public T deserializeOffset(String connector, byte[] data);
/**
* Deserialize an offset key or value from the specified connector using a schema.
* @param connector connector associated with the data
* @param data serialized bytes
* @param schema schema to deserialize to
* @return deserialized typed data
*/
public T deserializeOffset(String connector, byte[] data, Schema schema);
@Override
public void close();
}

View File

@ -1,48 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.storage;
import java.io.Closeable;
import java.util.Map;
/**
* Serializer for Copycat offsets.
* @param <T> native type of offsets.
*/
public interface OffsetSerializer<T> extends Closeable {
/**
* Configure this class.
* @param configs configs in key/value pairs
* @param isKey whether is for key or value
*/
public void configure(Map<String, ?> configs, boolean isKey);
/**
* @param connector the connector associated with offsets
* @param data typed data
* @return serialized bytes
*/
public byte[] serializeOffset(String connector, T data);
/**
* Close this serializer.
*/
@Override
public void close();
}

View File

@ -17,8 +17,6 @@
package org.apache.kafka.copycat.storage; package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.Schema;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
@ -33,15 +31,14 @@ public interface OffsetStorageReader {
* gets it from the backing store, which may require some network round trips. * gets it from the backing store, which may require some network round trips.
* *
* @param stream object uniquely identifying the stream of data * @param stream object uniquely identifying the stream of data
* @param schema schema used to decode the offset value
* @return object uniquely identifying the offset in the stream of data * @return object uniquely identifying the offset in the stream of data
*/ */
public Object getOffset(Object stream, Schema schema); public Object getOffset(Object stream);
/** /**
* <p> * <p>
* Get a set of offsets for the specified stream identifiers. This may be more efficient * Get a set of offsets for the specified stream identifiers. This may be more efficient
* than calling {@link #getOffset(Object, Schema)} repeatedly. * than calling {@link #getOffset(Object)} repeatedly.
* </p> * </p>
* <p> * <p>
* Note that when errors occur, this method omits the associated data and tries to return as * Note that when errors occur, this method omits the associated data and tries to return as
@ -53,8 +50,7 @@ public interface OffsetStorageReader {
* </p> * </p>
* *
* @param streams set of identifiers for streams of data * @param streams set of identifiers for streams of data
* @param schema schema used to decode offset values
* @return a map of stream identifiers to decoded offsets * @return a map of stream identifiers to decoded offsets
*/ */
public Map<Object, Object> getOffsets(Collection<Object> streams, Schema schema); public Map<Object, Object> getOffsets(Collection<Object> streams);
} }

View File

@ -1,50 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.avro;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.storage.OffsetDeserializer;
import java.util.Map;
public class AvroDeserializer extends AbstractKafkaAvroDeserializer implements OffsetDeserializer<Object> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public Object deserializeOffset(String connector, byte[] data) {
// TODO: Support schema projection
return deserialize(data);
}
@Override
public Object deserializeOffset(String connector, byte[] data, Schema schema) {
// TODO: Support schema projection
return deserialize(data);
}
@Override
public void close() {
}
}

View File

@ -1,66 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.avro;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.storage.OffsetSerializer;
import java.util.Map;
public class AvroSerializer extends AbstractKafkaAvroSerializer implements OffsetSerializer<Object> {
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
Object url = configs.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
if (url == null) {
throw new CopycatRuntimeException("Missing Schema registry url!");
}
Object maxSchemaObject = configs.get(
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG);
if (maxSchemaObject == null) {
schemaRegistry = new CachedSchemaRegistryClient(
(String) url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
} else {
schemaRegistry = new CachedSchemaRegistryClient(
(String) url, (Integer) maxSchemaObject);
}
}
@Override
public byte[] serializeOffset(String connector, Object data) {
String subject;
if (isKey) {
subject = connector + "-key";
} else {
subject = connector + "-value";
}
return serializeImpl(subject, data);
}
@Override
public void close() {
}
}

View File

@ -17,8 +17,6 @@
package org.apache.kafka.copycat.file; package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceRecord;
@ -55,9 +53,7 @@ public class FileStreamSourceTask extends SourceTask<Object, Object> {
} else { } else {
try { try {
stream = new FileInputStream(filename); stream = new FileInputStream(filename);
Schema longSchema = SchemaBuilder.builder().longType(); Long lastRecordedOffset = (Long) context.getOffsetStorageReader().getOffset(null);
Long lastRecordedOffset
= (Long) context.getOffsetStorageReader().getOffset(null, longSchema);
if (lastRecordedOffset != null) { if (lastRecordedOffset != null) {
log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset); log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset);
long skipLeft = lastRecordedOffset; long skipLeft = lastRecordedOffset;

View File

@ -17,7 +17,6 @@
package org.apache.kafka.copycat.file; package org.apache.kafka.copycat.file;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceRecord;
@ -137,8 +136,7 @@ public class FileStreamSourceTaskTest {
private void expectOffsetLookupReturnNone() { private void expectOffsetLookupReturnNone() {
EasyMock.expect( EasyMock.expect(
offsetStorageReader.getOffset( offsetStorageReader.getOffset(EasyMock.anyObject(Object.class)))
EasyMock.anyObject(Object.class), EasyMock.anyObject(Schema.class)))
.andReturn(null); .andReturn(null);
} }
} }

View File

@ -52,58 +52,22 @@ public class WorkerConfig extends AbstractConfig {
public static final String CONVERTER_CLASS_CONFIG = "converter"; public static final String CONVERTER_CLASS_CONFIG = "converter";
public static final String CONVERTER_CLASS_DOC = public static final String CONVERTER_CLASS_DOC =
"Converter class for Copycat data that implements the <code>Converter</code> interface."; "Converter class for Copycat data that implements the <code>Converter</code> interface.";
public static final String CONVERTER_CLASS_DEFAULT
= "org.apache.kafka.copycat.avro.AvroConverter"; // TODO: Non-avro built-in?
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String KEY_SERIALIZER_CLASS_DOC = public static final String KEY_SERIALIZER_CLASS_DOC =
"Serializer class for key that implements the <code>Serializer</code> interface."; "Serializer class for key that implements the <code>Serializer</code> interface.";
public static final String KEY_SERIALIZER_CLASS_DEFAULT
= "io.confluent.kafka.serializers.KafkaAvroSerializer"; // TODO: Non-avro built-in?
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String VALUE_SERIALIZER_CLASS_DOC = public static final String VALUE_SERIALIZER_CLASS_DOC =
"Serializer class for value that implements the <code>Serializer</code> interface."; "Serializer class for value that implements the <code>Serializer</code> interface.";
public static final String VALUE_SERIALIZER_CLASS_DEFAULT
= "io.confluent.kafka.serializers.KafkaAvroSerializer"; // TODO: Non-avro built-in?
public static final String OFFSET_KEY_SERIALIZER_CLASS_CONFIG = "offset.key.serializer";
public static final String OFFSET_KEY_SERIALIZER_CLASS_DOC =
"Serializer class for key that implements the <code>OffsetSerializer</code> interface.";
public static final String OFFSET_KEY_SERIALIZER_CLASS_DEFAULT
= "org.apache.kafka.copycat.avro.AvroSerializer"; // TODO: Non-avro built-in?
public static final String OFFSET_VALUE_SERIALIZER_CLASS_CONFIG = "offset.value.serializer";
public static final String OFFSET_VALUE_SERIALIZER_CLASS_DOC =
"Serializer class for value that implements the <code>OffsetSerializer</code> interface.";
public static final String OFFSET_VALUE_SERIALIZER_CLASS_DEFAULT
= "org.apache.kafka.copycat.avro.AvroSerializer"; // TODO: Non-avro built-in?
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"; public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String KEY_DESERIALIZER_CLASS_DOC = public static final String KEY_DESERIALIZER_CLASS_DOC =
"Serializer class for key that implements the <code>Deserializer</code> interface."; "Serializer class for key that implements the <code>Deserializer</code> interface.";
public static final String KEY_DESERIALIZER_CLASS_DEFAULT
= "io.confluent.kafka.serializers.KafkaAvroDeserializer"; // TODO: Non-avro built-in?
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_DOC = public static final String VALUE_DESERIALIZER_CLASS_DOC =
"Deserializer class for value that implements the <code>Deserializer</code> interface."; "Deserializer class for value that implements the <code>Deserializer</code> interface.";
public static final String VALUE_DESERIALIZER_CLASS_DEFAULT
= "io.confluent.kafka.serializers.KafkaAvroDeserializer"; // TODO: Non-avro built-in?
public static final String OFFSET_KEY_DESERIALIZER_CLASS_CONFIG = "offset.key.deserializer";
public static final String OFFSET_KEY_DESERIALIZER_CLASS_DOC =
"Deserializer class for key that implements the <code>OffsetDeserializer</code> interface.";
public static final String OFFSET_KEY_DESERIALIZER_CLASS_DEFAULT
= "org.apache.kafka.copycat.avro.AvroDeserializer"; // TODO: Non-avro built-in?
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG = "offset.value.deserializer";
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_DOC =
"Deserializer class for value that implements the <code>OffsetDeserializer</code> interface.";
public static final String OFFSET_VALUE_DESERIALIZER_CLASS_DEFAULT
= "org.apache.kafka.copycat.avro.AvroDeserializer"; // TODO: Non-avro built-in?
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
= "task.shutdown.graceful.timeout.ms"; = "task.shutdown.graceful.timeout.ms";
@ -137,27 +101,16 @@ public class WorkerConfig extends AbstractConfig {
.define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC) .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT, .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
Importance.HIGH, BOOSTRAP_SERVERS_DOC) Importance.HIGH, BOOSTRAP_SERVERS_DOC)
.define(CONVERTER_CLASS_CONFIG, Type.CLASS, CONVERTER_CLASS_DEFAULT, .define(CONVERTER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, CONVERTER_CLASS_DOC) Importance.HIGH, CONVERTER_CLASS_DOC)
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, KEY_SERIALIZER_CLASS_DEFAULT, .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, VALUE_SERIALIZER_CLASS_DEFAULT, .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
.define(OFFSET_KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, OFFSET_KEY_SERIALIZER_CLASS_DEFAULT, .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, OFFSET_KEY_SERIALIZER_CLASS_DOC)
.define(OFFSET_VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
OFFSET_VALUE_SERIALIZER_CLASS_DEFAULT,
Importance.HIGH, OFFSET_VALUE_SERIALIZER_CLASS_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, KEY_DESERIALIZER_CLASS_DEFAULT,
Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC) Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
.define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, VALUE_DESERIALIZER_CLASS_DEFAULT, .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC)
.define(OFFSET_KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
OFFSET_KEY_DESERIALIZER_CLASS_DEFAULT,
Importance.HIGH, OFFSET_KEY_DESERIALIZER_CLASS_DOC)
.define(OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
OFFSET_VALUE_DESERIALIZER_CLASS_DEFAULT,
Importance.HIGH, OFFSET_VALUE_DESERIALIZER_CLASS_DOC)
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG, .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC) TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)

View File

@ -17,6 +17,8 @@
package org.apache.kafka.copycat.runtime; package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
@ -54,10 +56,10 @@ public class Worker {
private WorkerConfig config; private WorkerConfig config;
private Converter converter; private Converter converter;
private OffsetBackingStore offsetBackingStore; private OffsetBackingStore offsetBackingStore;
private OffsetSerializer offsetKeySerializer; private Serializer offsetKeySerializer;
private OffsetSerializer offsetValueSerializer; private Serializer offsetValueSerializer;
private OffsetDeserializer offsetKeyDeserializer; private Deserializer offsetKeyDeserializer;
private OffsetDeserializer offsetValueDeserializer; private Deserializer offsetValueDeserializer;
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>(); private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<ConnectorTaskId, WorkerTask>();
private KafkaProducer producer; private KafkaProducer producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
@ -71,8 +73,8 @@ public class Worker {
} }
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore, public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
OffsetSerializer offsetKeySerializer, OffsetSerializer offsetValueSerializer, Serializer offsetKeySerializer, Serializer offsetValueSerializer,
OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer) { Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
this.time = time; this.time = time;
this.config = config; this.config = config;
this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(), this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(),
@ -83,8 +85,7 @@ public class Worker {
this.offsetKeySerializer = offsetKeySerializer; this.offsetKeySerializer = offsetKeySerializer;
} else { } else {
this.offsetKeySerializer = Reflection.instantiate( this.offsetKeySerializer = Reflection.instantiate(
config.getClass(WorkerConfig.OFFSET_KEY_SERIALIZER_CLASS_CONFIG).getName(), config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName(), Serializer.class);
OffsetSerializer.class);
this.offsetKeySerializer.configure(config.getOriginalProperties(), true); this.offsetKeySerializer.configure(config.getOriginalProperties(), true);
} }
@ -92,8 +93,7 @@ public class Worker {
this.offsetValueSerializer = offsetValueSerializer; this.offsetValueSerializer = offsetValueSerializer;
} else { } else {
this.offsetValueSerializer = Reflection.instantiate( this.offsetValueSerializer = Reflection.instantiate(
config.getClass(WorkerConfig.OFFSET_VALUE_SERIALIZER_CLASS_CONFIG).getName(), config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName(), Serializer.class);
OffsetSerializer.class);
this.offsetValueSerializer.configure(config.getOriginalProperties(), false); this.offsetValueSerializer.configure(config.getOriginalProperties(), false);
} }
@ -101,8 +101,7 @@ public class Worker {
this.offsetKeyDeserializer = offsetKeyDeserializer; this.offsetKeyDeserializer = offsetKeyDeserializer;
} else { } else {
this.offsetKeyDeserializer = Reflection.instantiate( this.offsetKeyDeserializer = Reflection.instantiate(
config.getClass(WorkerConfig.OFFSET_KEY_DESERIALIZER_CLASS_CONFIG).getName(), config.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName(), Deserializer.class);
OffsetDeserializer.class);
this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true); this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true);
} }
@ -110,8 +109,7 @@ public class Worker {
this.offsetValueDeserializer = offsetValueDeserializer; this.offsetValueDeserializer = offsetValueDeserializer;
} else { } else {
this.offsetValueDeserializer = Reflection.instantiate( this.offsetValueDeserializer = Reflection.instantiate(
config.getClass(WorkerConfig.OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG).getName(), config.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName(), Deserializer.class);
OffsetDeserializer.class);
this.offsetValueDeserializer.configure(config.getOriginalProperties(), false); this.offsetValueDeserializer.configure(config.getOriginalProperties(), false);
} }
} }

View File

@ -17,7 +17,8 @@
package org.apache.kafka.copycat.storage; package org.apache.kafka.copycat.storage;
import org.apache.kafka.copycat.data.Schema; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -39,13 +40,13 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
private final OffsetBackingStore backingStore; private final OffsetBackingStore backingStore;
private final String namespace; private final String namespace;
private final Converter converter; private final Converter converter;
private final OffsetSerializer keySerializer; private final Serializer keySerializer;
private final OffsetDeserializer valueDeserializer; private final Deserializer valueDeserializer;
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace, public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
Converter converter, Converter converter,
OffsetSerializer keySerializer, Serializer keySerializer,
OffsetDeserializer valueDeserializer) { Deserializer valueDeserializer) {
this.backingStore = backingStore; this.backingStore = backingStore;
this.namespace = namespace; this.namespace = namespace;
this.converter = converter; this.converter = converter;
@ -54,18 +55,17 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
} }
@Override @Override
public Object getOffset(Object stream, Schema schema) { public Object getOffset(Object stream) {
return getOffsets(Arrays.asList(stream), schema).get(stream); return getOffsets(Arrays.asList(stream)).get(stream);
} }
@Override @Override
public Map<Object, Object> getOffsets(Collection<Object> streams, Schema schema) { public Map<Object, Object> getOffsets(Collection<Object> streams) {
// Serialize keys so backing store can work with them // Serialize keys so backing store can work with them
Map<ByteBuffer, Object> serializedToOriginal = new HashMap<ByteBuffer, Object>(streams.size()); Map<ByteBuffer, Object> serializedToOriginal = new HashMap<ByteBuffer, Object>(streams.size());
for (Object key : streams) { for (Object key : streams) {
try { try {
byte[] keySerialized = keySerializer.serializeOffset(namespace, byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key));
converter.fromCopycatData(key));
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null; ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
serializedToOriginal.put(keyBuffer, key); serializedToOriginal.put(keyBuffer, key);
} catch (Throwable t) { } catch (Throwable t) {
@ -96,7 +96,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
} }
Object origKey = serializedToOriginal.get(rawEntry.getKey()); Object origKey = serializedToOriginal.get(rawEntry.getKey());
Object deserializedValue = converter.toCopycatData( Object deserializedValue = converter.toCopycatData(
valueDeserializer.deserializeOffset(namespace, rawEntry.getValue().array(), schema) valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
); );
result.put(origKey, deserializedValue); result.put(origKey, deserializedValue);

View File

@ -17,6 +17,7 @@
package org.apache.kafka.copycat.storage; package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.util.Callback; import org.apache.kafka.copycat.util.Callback;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -67,8 +68,8 @@ public class OffsetStorageWriter {
private final OffsetBackingStore backingStore; private final OffsetBackingStore backingStore;
private final Converter converter; private final Converter converter;
private final OffsetSerializer keySerializer; private final Serializer keySerializer;
private final OffsetSerializer valueSerializer; private final Serializer valueSerializer;
private final String namespace; private final String namespace;
private Map<Object, Object> data = new HashMap<Object, Object>(); private Map<Object, Object> data = new HashMap<Object, Object>();
@ -79,7 +80,7 @@ public class OffsetStorageWriter {
public OffsetStorageWriter(OffsetBackingStore backingStore, public OffsetStorageWriter(OffsetBackingStore backingStore,
String namespace, Converter converter, String namespace, Converter converter,
OffsetSerializer keySerializer, OffsetSerializer valueSerializer) { Serializer keySerializer, Serializer valueSerializer) {
this.backingStore = backingStore; this.backingStore = backingStore;
this.namespace = namespace; this.namespace = namespace;
this.converter = converter; this.converter = converter;
@ -134,11 +135,9 @@ public class OffsetStorageWriter {
try { try {
offsetsSerialized = new HashMap<ByteBuffer, ByteBuffer>(); offsetsSerialized = new HashMap<ByteBuffer, ByteBuffer>();
for (Map.Entry<Object, Object> entry : toFlush.entrySet()) { for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
byte[] key = keySerializer.serializeOffset(namespace, byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey()));
converter.fromCopycatData(entry.getKey()));
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null; ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
byte[] value = valueSerializer.serializeOffset(namespace, byte[] value = valueSerializer.serialize(namespace, converter.fromCopycatData(entry.getValue()));
converter.fromCopycatData(entry.getValue()));
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null; ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
offsetsSerialized.put(keyBuffer, valueBuffer); offsetsSerialized.put(keyBuffer, valueBuffer);
} }

View File

@ -76,7 +76,14 @@ public class WorkerSinkTaskTest extends ThreadedTest {
super.setup(); super.setup();
time = new MockTime(); time = new MockTime();
sinkTask = PowerMock.createMock(SinkTask.class); sinkTask = PowerMock.createMock(SinkTask.class);
workerConfig = new WorkerConfig(); Properties workerProps = new Properties();
// TODO: Non-avro built-ins?
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerConfig = new WorkerConfig(workerProps);
converter = PowerMock.createMock(Converter.class); converter = PowerMock.createMock(Converter.class);
workerTask = PowerMock.createPartialMock( workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},

View File

@ -66,7 +66,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private static final String CONVERTED_RECORD = "converted-record"; private static final String CONVERTED_RECORD = "converted-record";
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private WorkerConfig config = new WorkerConfig(); private WorkerConfig config;
private SourceTask sourceTask; private SourceTask sourceTask;
private Converter converter; private Converter converter;
private KafkaProducer<Object, Object> producer; private KafkaProducer<Object, Object> producer;
@ -86,6 +86,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Override @Override
public void setup() { public void setup() {
super.setup(); super.setup();
Properties workerProps = new Properties();
// TODO: Non-avro built-ins?
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
config = new WorkerConfig(workerProps);
sourceTask = PowerMock.createMock(SourceTask.class); sourceTask = PowerMock.createMock(SourceTask.class);
converter = PowerMock.createMock(Converter.class); converter = PowerMock.createMock(Converter.class);
producer = PowerMock.createMock(KafkaProducer.class); producer = PowerMock.createMock(KafkaProducer.class);

View File

@ -17,6 +17,8 @@
package org.apache.kafka.copycat.runtime; package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.copycat.cli.WorkerConfig; import org.apache.kafka.copycat.cli.WorkerConfig;
@ -48,18 +50,24 @@ public class WorkerTest extends ThreadedTest {
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private Worker worker; private Worker worker;
private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class); private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
private OffsetSerializer offsetKeySerializer = PowerMock.createMock(OffsetSerializer.class); private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class);
private OffsetSerializer offsetValueSerializer = PowerMock.createMock(OffsetSerializer.class); private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class);
private OffsetDeserializer offsetKeyDeserializer = PowerMock.createMock(OffsetDeserializer.class); private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class);
private OffsetDeserializer offsetValueDeserializer = PowerMock.createMock(OffsetDeserializer.class); private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class);
@Before @Before
public void setup() { public void setup() {
super.setup(); super.setup();
// TODO: Remove schema registry URL // TODO: Remove schema registry URL
// TODO: Non-avro built-ins?
Properties workerProps = new Properties(); Properties workerProps = new Properties();
workerProps.setProperty("schema.registry.url", "http://localhost:8081"); workerProps.setProperty("schema.registry.url", "http://localhost:8081");
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
WorkerConfig config = new WorkerConfig(workerProps); WorkerConfig config = new WorkerConfig(workerProps);
worker = new Worker(new MockTime(), config, offsetBackingStore, worker = new Worker(new MockTime(), config, offsetBackingStore,
offsetKeySerializer, offsetValueSerializer, offsetKeySerializer, offsetValueSerializer,

View File

@ -17,6 +17,7 @@
package org.apache.kafka.copycat.storage; package org.apache.kafka.copycat.storage;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.util.Callback; import org.apache.kafka.copycat.util.Callback;
import org.easymock.Capture; import org.easymock.Capture;
@ -49,8 +50,8 @@ public class OffsetStorageWriterTest {
private OffsetBackingStore store; private OffsetBackingStore store;
private Converter converter; private Converter converter;
private OffsetSerializer keySerializer; private Serializer keySerializer;
private OffsetSerializer valueSerializer; private Serializer valueSerializer;
private OffsetStorageWriter writer; private OffsetStorageWriter writer;
private static Exception exception = new RuntimeException("error"); private static Exception exception = new RuntimeException("error");
@ -61,8 +62,8 @@ public class OffsetStorageWriterTest {
public void setup() { public void setup() {
store = PowerMock.createMock(OffsetBackingStore.class); store = PowerMock.createMock(OffsetBackingStore.class);
converter = PowerMock.createMock(Converter.class); converter = PowerMock.createMock(Converter.class);
keySerializer = PowerMock.createMock(OffsetSerializer.class); keySerializer = PowerMock.createMock(Serializer.class);
valueSerializer = PowerMock.createMock(OffsetSerializer.class); valueSerializer = PowerMock.createMock(Serializer.class);
writer = new OffsetStorageWriter(store, NAMESPACE, converter, keySerializer, valueSerializer); writer = new OffsetStorageWriter(store, NAMESPACE, converter, keySerializer, valueSerializer);
service = Executors.newFixedThreadPool(1); service = Executors.newFixedThreadPool(1);
@ -193,9 +194,9 @@ public class OffsetStorageWriterTest {
final boolean fail, final boolean fail,
final CountDownLatch waitForCompletion) { final CountDownLatch waitForCompletion) {
EasyMock.expect(converter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED); EasyMock.expect(converter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
EasyMock.expect(keySerializer.serializeOffset(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED); EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
EasyMock.expect(converter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED); EasyMock.expect(converter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
EasyMock.expect(valueSerializer.serializeOffset(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED); EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
final Capture<Callback<Void>> storeCallback = Capture.newInstance(); final Capture<Callback<Void>> storeCallback = Capture.newInstance();
EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED), EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),