From 5a618c68151ab97fda46508f850c28b2eb6f5b48 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Wed, 29 Jul 2015 01:28:49 -0700 Subject: [PATCH] Remove offset serializers, instead reusing the existing serializers and removing schema projection support. --- checkstyle/import-control.xml | 1 + ...copycat-worker-local-persistent.properties | 7 ++ config/copycat-worker-local.properties | 9 ++- .../copycat/storage/OffsetDeserializer.java | 58 ---------------- .../copycat/storage/OffsetSerializer.java | 48 -------------- .../copycat/storage/OffsetStorageReader.java | 10 +-- .../kafka/copycat/avro/AvroDeserializer.java | 50 -------------- .../kafka/copycat/avro/AvroSerializer.java | 66 ------------------- .../copycat/file/FileStreamSourceTask.java | 6 +- .../file/FileStreamSourceTaskTest.java | 4 +- .../kafka/copycat/cli/WorkerConfig.java | 57 ++-------------- .../apache/kafka/copycat/runtime/Worker.java | 26 ++++---- .../storage/OffsetStorageReaderImpl.java | 22 +++---- .../copycat/storage/OffsetStorageWriter.java | 13 ++-- .../copycat/runtime/WorkerSinkTaskTest.java | 9 ++- .../copycat/runtime/WorkerSourceTaskTest.java | 10 ++- .../kafka/copycat/runtime/WorkerTest.java | 16 +++-- .../storage/OffsetStorageWriterTest.java | 13 ++-- 18 files changed, 91 insertions(+), 334 deletions(-) delete mode 100644 copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetDeserializer.java delete mode 100644 copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetSerializer.java delete mode 100644 copycat-avro/src/main/java/org/apache/kafka/copycat/avro/AvroDeserializer.java delete mode 100644 copycat-avro/src/main/java/org/apache/kafka/copycat/avro/AvroSerializer.java diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 9d148616a84..52dab63dd8f 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -145,6 +145,7 @@ + diff --git a/config/copycat-worker-local-persistent.properties b/config/copycat-worker-local-persistent.properties index 8db9d6102f3..a55663377f4 100644 --- a/config/copycat-worker-local-persistent.properties +++ b/config/copycat-worker-local-persistent.properties @@ -16,6 +16,13 @@ bootstrap.servers=localhost:9092 schema.registry.url=http://localhost:8081 +# TODO: Non-avro built-ins? +converter=org.apache.kafka.copycat.avro.AvroConverter +key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer +value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer +key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer +value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer + offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore offset.storage.file.filename=/tmp/copycat.offsets # Flush much faster than normal, which is useful for testing/debugging diff --git a/config/copycat-worker-local.properties b/config/copycat-worker-local.properties index e64da4b5215..9e6c81f8bc2 100644 --- a/config/copycat-worker-local.properties +++ b/config/copycat-worker-local.properties @@ -15,4 +15,11 @@ # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 -schema.registry.url=http://localhost:8081 \ No newline at end of file +schema.registry.url=http://localhost:8081 + +# TODO: Non-avro built-ins? +converter=org.apache.kafka.copycat.avro.AvroConverter +key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer +value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer +key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer +value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer \ No newline at end of file diff --git a/copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetDeserializer.java b/copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetDeserializer.java deleted file mode 100644 index 41a35bb7f31..00000000000 --- a/copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetDeserializer.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - - -import org.apache.kafka.copycat.data.Schema; - -import java.io.Closeable; -import java.util.Map; - -/** - * Deserializer for Copycat offsets. - * @param - */ -public interface OffsetDeserializer extends Closeable { - - /** - * Configure this class. - * @param configs configs in key/value pairs - * @param isKey whether is for key or value - */ - public void configure(Map configs, boolean isKey); - - /** - * Deserialize an offset key or value from the specified connector. - * @param connector connector associated with the data - * @param data serialized bytes - * @return deserialized typed data - */ - public T deserializeOffset(String connector, byte[] data); - - /** - * Deserialize an offset key or value from the specified connector using a schema. - * @param connector connector associated with the data - * @param data serialized bytes - * @param schema schema to deserialize to - * @return deserialized typed data - */ - public T deserializeOffset(String connector, byte[] data, Schema schema); - - @Override - public void close(); -} diff --git a/copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetSerializer.java b/copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetSerializer.java deleted file mode 100644 index e34a317c264..00000000000 --- a/copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetSerializer.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.storage; - -import java.io.Closeable; -import java.util.Map; - -/** - * Serializer for Copycat offsets. - * @param native type of offsets. - */ -public interface OffsetSerializer extends Closeable { - /** - * Configure this class. - * @param configs configs in key/value pairs - * @param isKey whether is for key or value - */ - public void configure(Map configs, boolean isKey); - - /** - * @param connector the connector associated with offsets - * @param data typed data - * @return serialized bytes - */ - public byte[] serializeOffset(String connector, T data); - - /** - * Close this serializer. - */ - @Override - public void close(); - -} diff --git a/copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java b/copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java index f1e7862c514..0cc296d66c7 100644 --- a/copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java +++ b/copycat-api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java @@ -17,8 +17,6 @@ package org.apache.kafka.copycat.storage; -import org.apache.kafka.copycat.data.Schema; - import java.util.Collection; import java.util.Map; @@ -33,15 +31,14 @@ public interface OffsetStorageReader { * gets it from the backing store, which may require some network round trips. * * @param stream object uniquely identifying the stream of data - * @param schema schema used to decode the offset value * @return object uniquely identifying the offset in the stream of data */ - public Object getOffset(Object stream, Schema schema); + public Object getOffset(Object stream); /** *

* Get a set of offsets for the specified stream identifiers. This may be more efficient - * than calling {@link #getOffset(Object, Schema)} repeatedly. + * than calling {@link #getOffset(Object)} repeatedly. *

*

* Note that when errors occur, this method omits the associated data and tries to return as @@ -53,8 +50,7 @@ public interface OffsetStorageReader { *

* * @param streams set of identifiers for streams of data - * @param schema schema used to decode offset values * @return a map of stream identifiers to decoded offsets */ - public Map getOffsets(Collection streams, Schema schema); + public Map getOffsets(Collection streams); } diff --git a/copycat-avro/src/main/java/org/apache/kafka/copycat/avro/AvroDeserializer.java b/copycat-avro/src/main/java/org/apache/kafka/copycat/avro/AvroDeserializer.java deleted file mode 100644 index 508842e4afc..00000000000 --- a/copycat-avro/src/main/java/org/apache/kafka/copycat/avro/AvroDeserializer.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.avro; - -import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.storage.OffsetDeserializer; - -import java.util.Map; - -public class AvroDeserializer extends AbstractKafkaAvroDeserializer implements OffsetDeserializer { - - @Override - public void configure(Map configs, boolean isKey) { - configure(new KafkaAvroDeserializerConfig(configs)); - } - - @Override - public Object deserializeOffset(String connector, byte[] data) { - // TODO: Support schema projection - return deserialize(data); - } - - @Override - public Object deserializeOffset(String connector, byte[] data, Schema schema) { - // TODO: Support schema projection - return deserialize(data); - } - - @Override - public void close() { - - } -} diff --git a/copycat-avro/src/main/java/org/apache/kafka/copycat/avro/AvroSerializer.java b/copycat-avro/src/main/java/org/apache/kafka/copycat/avro/AvroSerializer.java deleted file mode 100644 index 9268853cb82..00000000000 --- a/copycat-avro/src/main/java/org/apache/kafka/copycat/avro/AvroSerializer.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.avro; - -import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; -import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer; -import org.apache.kafka.copycat.errors.CopycatRuntimeException; -import org.apache.kafka.copycat.storage.OffsetSerializer; - -import java.util.Map; - -public class AvroSerializer extends AbstractKafkaAvroSerializer implements OffsetSerializer { - - private boolean isKey; - - - @Override - public void configure(Map configs, boolean isKey) { - this.isKey = isKey; - Object url = configs.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG); - if (url == null) { - throw new CopycatRuntimeException("Missing Schema registry url!"); - } - Object maxSchemaObject = configs.get( - AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_CONFIG); - if (maxSchemaObject == null) { - schemaRegistry = new CachedSchemaRegistryClient( - (String) url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); - } else { - schemaRegistry = new CachedSchemaRegistryClient( - (String) url, (Integer) maxSchemaObject); - } - } - - @Override - public byte[] serializeOffset(String connector, Object data) { - String subject; - if (isKey) { - subject = connector + "-key"; - } else { - subject = connector + "-value"; - } - return serializeImpl(subject, data); - } - - @Override - public void close() { - - } -} diff --git a/copycat-file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat-file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java index 0adc476ad4f..44444cbc387 100644 --- a/copycat-file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java +++ b/copycat-file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java @@ -17,8 +17,6 @@ package org.apache.kafka.copycat.file; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaBuilder; import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.source.SourceRecord; @@ -55,9 +53,7 @@ public class FileStreamSourceTask extends SourceTask { } else { try { stream = new FileInputStream(filename); - Schema longSchema = SchemaBuilder.builder().longType(); - Long lastRecordedOffset - = (Long) context.getOffsetStorageReader().getOffset(null, longSchema); + Long lastRecordedOffset = (Long) context.getOffsetStorageReader().getOffset(null); if (lastRecordedOffset != null) { log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset); long skipLeft = lastRecordedOffset; diff --git a/copycat-file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat-file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java index 06bd16d1337..aa193fe7303 100644 --- a/copycat-file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java +++ b/copycat-file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.copycat.file; -import org.apache.kafka.copycat.data.Schema; import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.source.SourceRecord; @@ -137,8 +136,7 @@ public class FileStreamSourceTaskTest { private void expectOffsetLookupReturnNone() { EasyMock.expect( - offsetStorageReader.getOffset( - EasyMock.anyObject(Object.class), EasyMock.anyObject(Schema.class))) + offsetStorageReader.getOffset(EasyMock.anyObject(Object.class))) .andReturn(null); } } \ No newline at end of file diff --git a/copycat-runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat-runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java index 9ee377150f2..223336450ab 100644 --- a/copycat-runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java +++ b/copycat-runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java @@ -52,58 +52,22 @@ public class WorkerConfig extends AbstractConfig { public static final String CONVERTER_CLASS_CONFIG = "converter"; public static final String CONVERTER_CLASS_DOC = "Converter class for Copycat data that implements the Converter interface."; - public static final String CONVERTER_CLASS_DEFAULT - = "org.apache.kafka.copycat.avro.AvroConverter"; // TODO: Non-avro built-in? public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the Serializer interface."; - public static final String KEY_SERIALIZER_CLASS_DEFAULT - = "io.confluent.kafka.serializers.KafkaAvroSerializer"; // TODO: Non-avro built-in? public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the Serializer interface."; - public static final String VALUE_SERIALIZER_CLASS_DEFAULT - = "io.confluent.kafka.serializers.KafkaAvroSerializer"; // TODO: Non-avro built-in? - - public static final String OFFSET_KEY_SERIALIZER_CLASS_CONFIG = "offset.key.serializer"; - public static final String OFFSET_KEY_SERIALIZER_CLASS_DOC = - "Serializer class for key that implements the OffsetSerializer interface."; - public static final String OFFSET_KEY_SERIALIZER_CLASS_DEFAULT - = "org.apache.kafka.copycat.avro.AvroSerializer"; // TODO: Non-avro built-in? - - public static final String OFFSET_VALUE_SERIALIZER_CLASS_CONFIG = "offset.value.serializer"; - public static final String OFFSET_VALUE_SERIALIZER_CLASS_DOC = - "Serializer class for value that implements the OffsetSerializer interface."; - public static final String OFFSET_VALUE_SERIALIZER_CLASS_DEFAULT - = "org.apache.kafka.copycat.avro.AvroSerializer"; // TODO: Non-avro built-in? - public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"; public static final String KEY_DESERIALIZER_CLASS_DOC = "Serializer class for key that implements the Deserializer interface."; - public static final String KEY_DESERIALIZER_CLASS_DEFAULT - = "io.confluent.kafka.serializers.KafkaAvroDeserializer"; // TODO: Non-avro built-in? public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the Deserializer interface."; - public static final String VALUE_DESERIALIZER_CLASS_DEFAULT - = "io.confluent.kafka.serializers.KafkaAvroDeserializer"; // TODO: Non-avro built-in? - - public static final String OFFSET_KEY_DESERIALIZER_CLASS_CONFIG = "offset.key.deserializer"; - public static final String OFFSET_KEY_DESERIALIZER_CLASS_DOC = - "Deserializer class for key that implements the OffsetDeserializer interface."; - public static final String OFFSET_KEY_DESERIALIZER_CLASS_DEFAULT - = "org.apache.kafka.copycat.avro.AvroDeserializer"; // TODO: Non-avro built-in? - - public static final String OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG = "offset.value.deserializer"; - public static final String OFFSET_VALUE_DESERIALIZER_CLASS_DOC = - "Deserializer class for value that implements the OffsetDeserializer interface."; - public static final String OFFSET_VALUE_DESERIALIZER_CLASS_DEFAULT - = "org.apache.kafka.copycat.avro.AvroDeserializer"; // TODO: Non-avro built-in? - public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG = "task.shutdown.graceful.timeout.ms"; @@ -137,27 +101,16 @@ public class WorkerConfig extends AbstractConfig { .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC) .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT, Importance.HIGH, BOOSTRAP_SERVERS_DOC) - .define(CONVERTER_CLASS_CONFIG, Type.CLASS, CONVERTER_CLASS_DEFAULT, + .define(CONVERTER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, CONVERTER_CLASS_DOC) - .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, KEY_SERIALIZER_CLASS_DEFAULT, + .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, VALUE_SERIALIZER_CLASS_DEFAULT, + .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) - .define(OFFSET_KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, OFFSET_KEY_SERIALIZER_CLASS_DEFAULT, - Importance.HIGH, OFFSET_KEY_SERIALIZER_CLASS_DOC) - .define(OFFSET_VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, - OFFSET_VALUE_SERIALIZER_CLASS_DEFAULT, - Importance.HIGH, OFFSET_VALUE_SERIALIZER_CLASS_DOC) - .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, KEY_DESERIALIZER_CLASS_DEFAULT, + .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC) - .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, VALUE_DESERIALIZER_CLASS_DEFAULT, + .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) - .define(OFFSET_KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, - OFFSET_KEY_DESERIALIZER_CLASS_DEFAULT, - Importance.HIGH, OFFSET_KEY_DESERIALIZER_CLASS_DOC) - .define(OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, - OFFSET_VALUE_DESERIALIZER_CLASS_DEFAULT, - Importance.HIGH, OFFSET_VALUE_DESERIALIZER_CLASS_DOC) .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC) diff --git a/copycat-runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat-runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java index c96f11992b8..494a65a43ad 100644 --- a/copycat-runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java +++ b/copycat-runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java @@ -17,6 +17,8 @@ package org.apache.kafka.copycat.runtime; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.clients.producer.KafkaProducer; @@ -54,10 +56,10 @@ public class Worker { private WorkerConfig config; private Converter converter; private OffsetBackingStore offsetBackingStore; - private OffsetSerializer offsetKeySerializer; - private OffsetSerializer offsetValueSerializer; - private OffsetDeserializer offsetKeyDeserializer; - private OffsetDeserializer offsetValueDeserializer; + private Serializer offsetKeySerializer; + private Serializer offsetValueSerializer; + private Deserializer offsetKeyDeserializer; + private Deserializer offsetValueDeserializer; private HashMap tasks = new HashMap(); private KafkaProducer producer; private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; @@ -71,8 +73,8 @@ public class Worker { } public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore, - OffsetSerializer offsetKeySerializer, OffsetSerializer offsetValueSerializer, - OffsetDeserializer offsetKeyDeserializer, OffsetDeserializer offsetValueDeserializer) { + Serializer offsetKeySerializer, Serializer offsetValueSerializer, + Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) { this.time = time; this.config = config; this.converter = Reflection.instantiate(config.getClass(WorkerConfig.CONVERTER_CLASS_CONFIG).getName(), @@ -83,8 +85,7 @@ public class Worker { this.offsetKeySerializer = offsetKeySerializer; } else { this.offsetKeySerializer = Reflection.instantiate( - config.getClass(WorkerConfig.OFFSET_KEY_SERIALIZER_CLASS_CONFIG).getName(), - OffsetSerializer.class); + config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName(), Serializer.class); this.offsetKeySerializer.configure(config.getOriginalProperties(), true); } @@ -92,8 +93,7 @@ public class Worker { this.offsetValueSerializer = offsetValueSerializer; } else { this.offsetValueSerializer = Reflection.instantiate( - config.getClass(WorkerConfig.OFFSET_VALUE_SERIALIZER_CLASS_CONFIG).getName(), - OffsetSerializer.class); + config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName(), Serializer.class); this.offsetValueSerializer.configure(config.getOriginalProperties(), false); } @@ -101,8 +101,7 @@ public class Worker { this.offsetKeyDeserializer = offsetKeyDeserializer; } else { this.offsetKeyDeserializer = Reflection.instantiate( - config.getClass(WorkerConfig.OFFSET_KEY_DESERIALIZER_CLASS_CONFIG).getName(), - OffsetDeserializer.class); + config.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName(), Deserializer.class); this.offsetKeyDeserializer.configure(config.getOriginalProperties(), true); } @@ -110,8 +109,7 @@ public class Worker { this.offsetValueDeserializer = offsetValueDeserializer; } else { this.offsetValueDeserializer = Reflection.instantiate( - config.getClass(WorkerConfig.OFFSET_VALUE_DESERIALIZER_CLASS_CONFIG).getName(), - OffsetDeserializer.class); + config.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName(), Deserializer.class); this.offsetValueDeserializer.configure(config.getOriginalProperties(), false); } } diff --git a/copycat-runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat-runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java index 26774e35896..20b9c4fd1eb 100644 --- a/copycat-runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java +++ b/copycat-runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java @@ -17,7 +17,8 @@ package org.apache.kafka.copycat.storage; -import org.apache.kafka.copycat.data.Schema; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,13 +40,13 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader { private final OffsetBackingStore backingStore; private final String namespace; private final Converter converter; - private final OffsetSerializer keySerializer; - private final OffsetDeserializer valueDeserializer; + private final Serializer keySerializer; + private final Deserializer valueDeserializer; public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace, Converter converter, - OffsetSerializer keySerializer, - OffsetDeserializer valueDeserializer) { + Serializer keySerializer, + Deserializer valueDeserializer) { this.backingStore = backingStore; this.namespace = namespace; this.converter = converter; @@ -54,18 +55,17 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader { } @Override - public Object getOffset(Object stream, Schema schema) { - return getOffsets(Arrays.asList(stream), schema).get(stream); + public Object getOffset(Object stream) { + return getOffsets(Arrays.asList(stream)).get(stream); } @Override - public Map getOffsets(Collection streams, Schema schema) { + public Map getOffsets(Collection streams) { // Serialize keys so backing store can work with them Map serializedToOriginal = new HashMap(streams.size()); for (Object key : streams) { try { - byte[] keySerialized = keySerializer.serializeOffset(namespace, - converter.fromCopycatData(key)); + byte[] keySerialized = keySerializer.serialize(namespace, converter.fromCopycatData(key)); ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null; serializedToOriginal.put(keyBuffer, key); } catch (Throwable t) { @@ -96,7 +96,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader { } Object origKey = serializedToOriginal.get(rawEntry.getKey()); Object deserializedValue = converter.toCopycatData( - valueDeserializer.deserializeOffset(namespace, rawEntry.getValue().array(), schema) + valueDeserializer.deserialize(namespace, rawEntry.getValue().array()) ); result.put(origKey, deserializedValue); diff --git a/copycat-runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java b/copycat-runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java index b9d4755841b..e512499878a 100644 --- a/copycat-runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java +++ b/copycat-runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java @@ -17,6 +17,7 @@ package org.apache.kafka.copycat.storage; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.util.Callback; import org.slf4j.Logger; @@ -67,8 +68,8 @@ public class OffsetStorageWriter { private final OffsetBackingStore backingStore; private final Converter converter; - private final OffsetSerializer keySerializer; - private final OffsetSerializer valueSerializer; + private final Serializer keySerializer; + private final Serializer valueSerializer; private final String namespace; private Map data = new HashMap(); @@ -79,7 +80,7 @@ public class OffsetStorageWriter { public OffsetStorageWriter(OffsetBackingStore backingStore, String namespace, Converter converter, - OffsetSerializer keySerializer, OffsetSerializer valueSerializer) { + Serializer keySerializer, Serializer valueSerializer) { this.backingStore = backingStore; this.namespace = namespace; this.converter = converter; @@ -134,11 +135,9 @@ public class OffsetStorageWriter { try { offsetsSerialized = new HashMap(); for (Map.Entry entry : toFlush.entrySet()) { - byte[] key = keySerializer.serializeOffset(namespace, - converter.fromCopycatData(entry.getKey())); + byte[] key = keySerializer.serialize(namespace, converter.fromCopycatData(entry.getKey())); ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null; - byte[] value = valueSerializer.serializeOffset(namespace, - converter.fromCopycatData(entry.getValue())); + byte[] value = valueSerializer.serialize(namespace, converter.fromCopycatData(entry.getValue())); ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null; offsetsSerialized.put(keyBuffer, valueBuffer); } diff --git a/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index e3a4f1dfe8b..a631c870a4d 100644 --- a/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -76,7 +76,14 @@ public class WorkerSinkTaskTest extends ThreadedTest { super.setup(); time = new MockTime(); sinkTask = PowerMock.createMock(SinkTask.class); - workerConfig = new WorkerConfig(); + Properties workerProps = new Properties(); + // TODO: Non-avro built-ins? + workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter"); + workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); + workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); + workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); + workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); + workerConfig = new WorkerConfig(workerProps); converter = PowerMock.createMock(Converter.class); workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, diff --git a/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java index fb6d9df2175..fbfae561d7c 100644 --- a/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java +++ b/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java @@ -66,7 +66,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { private static final String CONVERTED_RECORD = "converted-record"; private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); - private WorkerConfig config = new WorkerConfig(); + private WorkerConfig config; private SourceTask sourceTask; private Converter converter; private KafkaProducer producer; @@ -86,6 +86,14 @@ public class WorkerSourceTaskTest extends ThreadedTest { @Override public void setup() { super.setup(); + Properties workerProps = new Properties(); + // TODO: Non-avro built-ins? + workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter"); + workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); + workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); + workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); + workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); + config = new WorkerConfig(workerProps); sourceTask = PowerMock.createMock(SourceTask.class); converter = PowerMock.createMock(Converter.class); producer = PowerMock.createMock(KafkaProducer.class); diff --git a/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java index fd3d86f2bfb..2be44b55ec0 100644 --- a/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java +++ b/copycat-runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.copycat.runtime; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.copycat.cli.WorkerConfig; @@ -48,18 +50,24 @@ public class WorkerTest extends ThreadedTest { private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private Worker worker; private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class); - private OffsetSerializer offsetKeySerializer = PowerMock.createMock(OffsetSerializer.class); - private OffsetSerializer offsetValueSerializer = PowerMock.createMock(OffsetSerializer.class); - private OffsetDeserializer offsetKeyDeserializer = PowerMock.createMock(OffsetDeserializer.class); - private OffsetDeserializer offsetValueDeserializer = PowerMock.createMock(OffsetDeserializer.class); + private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class); + private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class); + private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class); + private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class); @Before public void setup() { super.setup(); // TODO: Remove schema registry URL + // TODO: Non-avro built-ins? Properties workerProps = new Properties(); workerProps.setProperty("schema.registry.url", "http://localhost:8081"); + workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter"); + workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); + workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); + workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); + workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); WorkerConfig config = new WorkerConfig(workerProps); worker = new Worker(new MockTime(), config, offsetBackingStore, offsetKeySerializer, offsetValueSerializer, diff --git a/copycat-runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java b/copycat-runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java index cbd1f5557ee..60f4b3af6cb 100644 --- a/copycat-runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java +++ b/copycat-runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.copycat.storage; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.copycat.errors.CopycatRuntimeException; import org.apache.kafka.copycat.util.Callback; import org.easymock.Capture; @@ -49,8 +50,8 @@ public class OffsetStorageWriterTest { private OffsetBackingStore store; private Converter converter; - private OffsetSerializer keySerializer; - private OffsetSerializer valueSerializer; + private Serializer keySerializer; + private Serializer valueSerializer; private OffsetStorageWriter writer; private static Exception exception = new RuntimeException("error"); @@ -61,8 +62,8 @@ public class OffsetStorageWriterTest { public void setup() { store = PowerMock.createMock(OffsetBackingStore.class); converter = PowerMock.createMock(Converter.class); - keySerializer = PowerMock.createMock(OffsetSerializer.class); - valueSerializer = PowerMock.createMock(OffsetSerializer.class); + keySerializer = PowerMock.createMock(Serializer.class); + valueSerializer = PowerMock.createMock(Serializer.class); writer = new OffsetStorageWriter(store, NAMESPACE, converter, keySerializer, valueSerializer); service = Executors.newFixedThreadPool(1); @@ -193,9 +194,9 @@ public class OffsetStorageWriterTest { final boolean fail, final CountDownLatch waitForCompletion) { EasyMock.expect(converter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED); - EasyMock.expect(keySerializer.serializeOffset(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED); + EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED); EasyMock.expect(converter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED); - EasyMock.expect(valueSerializer.serializeOffset(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED); + EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED); final Capture> storeCallback = Capture.newInstance(); EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),