From 3803e5cb37cb602ff9eab5562ff8db3a2dd79b45 Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Mon, 31 Aug 2015 12:26:16 -0700 Subject: [PATCH] KAFKA-2475: Make Copycat only have a Converter class instead of Serializer, Deserializer, and Converter. The Converter class now translates directly between byte[] and Copycat's data API instead of requiring an intermediate runtime type like Avro's GenericRecord or Jackson's JsonNode. Author: Ewen Cheslack-Postava Reviewers: Gwen Shapira Closes #172 from ewencp/kafka-2475-unified-serializer-converter and squashes the following commits: 566c52f [Ewen Cheslack-Postava] Checkstyle fixes 320d0df [Ewen Cheslack-Postava] Restrict offset format. 85797e7 [Ewen Cheslack-Postava] Add StringConverter for using Copycat with raw strings. 698d65c [Ewen Cheslack-Postava] Move and update outdated comment about handing of types for BYTES type in Copycat. 4bed051 [Ewen Cheslack-Postava] KAFKA-2475: Make Copycat only have a Converter class instead of Serializer, Deserializer, and Converter. --- .../kafka/common/config/AbstractConfig.java | 15 + .../common/config/AbstractConfigTest.java | 13 + config/copycat-standalone.properties | 17 +- .../kafka/copycat/data/CopycatSchema.java | 92 ++++-- .../org/apache/kafka/copycat/data/Schema.java | 1 - .../kafka/copycat/data/SchemaBuilder.java | 3 +- .../kafka/copycat/source/SourceRecord.java | 41 +-- .../kafka/copycat/storage/Converter.java | 23 +- .../copycat/storage/OffsetStorageReader.java | 13 +- .../copycat/storage/StringConverter.java | 81 +++++ .../copycat/storage/StringConverterTest.java | 83 +++++ .../copycat/file/FileStreamSourceTask.java | 33 +- .../file/FileStreamSourceTaskTest.java | 23 +- .../kafka/copycat/json/JsonConverter.java | 287 ++++++++++++------ .../kafka/copycat/json/JsonDeserializer.java | 31 +- .../kafka/copycat/json/JsonSerializer.java | 15 - .../kafka/copycat/json/JsonConverterTest.java | 243 ++++++++++++--- .../kafka/copycat/cli/WorkerConfig.java | 35 +-- .../apache/kafka/copycat/runtime/Worker.java | 72 ++--- .../kafka/copycat/runtime/WorkerSinkTask.java | 30 +- .../copycat/runtime/WorkerSourceTask.java | 39 ++- .../storage/OffsetStorageReaderImpl.java | 39 ++- .../copycat/storage/OffsetStorageWriter.java | 37 ++- .../kafka/copycat/storage/OffsetUtils.java | 46 +++ .../copycat/runtime/WorkerSinkTaskTest.java | 26 +- .../copycat/runtime/WorkerSourceTaskTest.java | 60 ++-- .../kafka/copycat/runtime/WorkerTest.java | 18 +- .../storage/OffsetStorageWriterTest.java | 42 +-- tests/kafkatest/tests/copycat_test.py | 33 +- .../templates/copycat-file-sink.properties | 2 +- .../templates/copycat-file-source.properties | 2 +- .../templates/copycat-standalone.properties | 19 +- 32 files changed, 994 insertions(+), 520 deletions(-) create mode 100644 copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java create mode 100644 copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java create mode 100644 copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 774701a91c3..12a19274ffb 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -115,6 +115,21 @@ public class AbstractConfig { return copy; } + /** + * Gets all original settings with the given prefix, stripping the prefix before adding it to the output. + * + * @param prefix the prefix to use as a filter + * @return a Map containing the settings with the prefix + */ + public Map originalsWithPrefix(String prefix) { + Map result = new HashMap(); + for (Map.Entry entry : originals.entrySet()) { + if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) + result.put(entry.getKey().substring(prefix.length()), entry.getValue()); + } + return result; + } + public Map values() { return new HashMap(values); } diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index db1b0ee9113..28064ec8496 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -17,10 +17,12 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.metrics.MetricsReporter; import org.junit.Test; +import java.util.HashMap; import java.util.Map; import java.util.Properties; import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; public class AbstractConfigTest { @@ -35,6 +37,17 @@ public class AbstractConfigTest { testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); } + @Test + public void testOriginalsWithPrefix() { + Properties props = new Properties(); + props.put("foo.bar", "abc"); + props.put("setting", "def"); + TestConfig config = new TestConfig(props); + Map expected = new HashMap<>(); + expected.put("bar", "abc"); + assertEquals(expected, config.originalsWithPrefix("foo.")); + } + private void testValidInputs(String configValue) { Properties props = new Properties(); props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue); diff --git a/config/copycat-standalone.properties b/config/copycat-standalone.properties index cf3b268ec25..fd264b565f7 100644 --- a/config/copycat-standalone.properties +++ b/config/copycat-standalone.properties @@ -16,12 +16,21 @@ # These are defaults. This file just demonstrates how to override some settings. bootstrap.servers=localhost:9092 +# The converters specify the format of data in Kafka and how to translate it into Copycat data. Every Copycat user will +# need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.copycat.json.JsonConverter value.converter=org.apache.kafka.copycat.json.JsonConverter -key.serializer=org.apache.kafka.copycat.json.JsonSerializer -value.serializer=org.apache.kafka.copycat.json.JsonSerializer -key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer -value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer +# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply +# it to +key.converter.schemas.enable=true +value.converter.schemas.enable=true + +# The offset converter is configurable and must be specified, but most users will always want to use the built-in default. +# Offset data is never visible outside of Copcyat. +offset.key.converter=org.apache.kafka.copycat.json.JsonConverter +offset.value.converter=org.apache.kafka.copycat.json.JsonConverter +offset.key.converter.schemas.enable=false +offset.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/copycat.offsets # Flush much faster than normal, which is useful for testing/debugging diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java index c823f284542..809496a23f0 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java @@ -23,20 +23,37 @@ import java.nio.ByteBuffer; import java.util.*; public class CopycatSchema implements Schema { - private static final Map> SCHEMA_TYPE_CLASSES = new HashMap<>(); + /** + * Maps Schema.Types to a list of Java classes that can be used to represent them. + */ + private static final Map> SCHEMA_TYPE_CLASSES = new HashMap<>(); + + /** + * Maps the Java classes to the corresponding Schema.Type. + */ + private static final Map, Type> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>(); + static { - SCHEMA_TYPE_CLASSES.put(Type.INT8, Byte.class); - SCHEMA_TYPE_CLASSES.put(Type.INT16, Short.class); - SCHEMA_TYPE_CLASSES.put(Type.INT32, Integer.class); - SCHEMA_TYPE_CLASSES.put(Type.INT64, Long.class); - SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Float.class); - SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Double.class); - SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Boolean.class); - SCHEMA_TYPE_CLASSES.put(Type.STRING, String.class); - SCHEMA_TYPE_CLASSES.put(Type.ARRAY, List.class); - SCHEMA_TYPE_CLASSES.put(Type.MAP, Map.class); - SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Struct.class); - // Bytes are handled as a special case + SCHEMA_TYPE_CLASSES.put(Type.INT8, Arrays.asList((Class) Byte.class)); + SCHEMA_TYPE_CLASSES.put(Type.INT16, Arrays.asList((Class) Short.class)); + SCHEMA_TYPE_CLASSES.put(Type.INT32, Arrays.asList((Class) Integer.class)); + SCHEMA_TYPE_CLASSES.put(Type.INT64, Arrays.asList((Class) Long.class)); + SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Arrays.asList((Class) Float.class)); + SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Arrays.asList((Class) Double.class)); + SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Arrays.asList((Class) Boolean.class)); + SCHEMA_TYPE_CLASSES.put(Type.STRING, Arrays.asList((Class) String.class)); + // Bytes are special and have 2 representations. byte[] causes problems because it doesn't handle equals() and + // hashCode() like we want objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause + // those methods to fail, so ByteBuffers are recommended + SCHEMA_TYPE_CLASSES.put(Type.BYTES, Arrays.asList((Class) byte[].class, (Class) ByteBuffer.class)); + SCHEMA_TYPE_CLASSES.put(Type.ARRAY, Arrays.asList((Class) List.class)); + SCHEMA_TYPE_CLASSES.put(Type.MAP, Arrays.asList((Class) Map.class)); + SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Arrays.asList((Class) Struct.class)); + + for (Map.Entry> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) { + for (Class schemaClass : schemaClasses.getValue()) + JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey()); + } } // The type of the field @@ -158,14 +175,19 @@ public class CopycatSchema implements Schema { return; } - // Special case for bytes. byte[] causes problems because it doesn't handle equals()/hashCode() like we want - // objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause those methods to fail, so - // ByteBuffers are recommended - if (schema.type() == Type.BYTES && (value instanceof byte[] || value instanceof ByteBuffer)) - return; - Class expectedClass = SCHEMA_TYPE_CLASSES.get(schema.type()); - if (expectedClass == null || !expectedClass.isInstance(value)) - throw new DataException("Invalid value: expected " + expectedClass + " for type " + schema.type() + " but tried to use " + value.getClass()); + final List expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type()); + if (expectedClasses == null) + throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); + + boolean foundMatch = false; + for (Class expectedClass : expectedClasses) { + if (expectedClass.isInstance(value)) { + foundMatch = true; + break; + } + } + if (!foundMatch) + throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); switch (schema.type()) { case STRUCT: @@ -232,4 +254,32 @@ public class CopycatSchema implements Schema { else return "Schema{" + type + "}"; } + + + /** + * Get the {@link Type} associated with the the given class. + * + * @param klass the Class to + * @return the corresponding type, nor null if there is no matching type + */ + public static Type schemaType(Class klass) { + synchronized (JAVA_CLASS_SCHEMA_TYPES) { + Type schemaType = JAVA_CLASS_SCHEMA_TYPES.get(klass); + if (schemaType != null) + return schemaType; + + // Since the lookup only checks the class, we need to also try + for (Map.Entry, Type> entry : JAVA_CLASS_SCHEMA_TYPES.entrySet()) { + try { + klass.asSubclass(entry.getKey()); + // Cache this for subsequent lookups + JAVA_CLASS_SCHEMA_TYPES.put(klass, entry.getValue()); + return entry.getValue(); + } catch (ClassCastException e) { + // Expected, ignore + } + } + } + return null; + } } diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java index 5ceb57d6f5d..4ece21d9ecd 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java @@ -92,7 +92,6 @@ public interface Schema { Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build(); Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build(); - /** * @return the type of this schema */ diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java index fe9d474f427..d9c149d42c9 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java @@ -21,6 +21,7 @@ import org.apache.kafka.copycat.errors.DataException; import org.apache.kafka.copycat.errors.SchemaBuilderException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -346,7 +347,7 @@ public class SchemaBuilder implements Schema { * @return the {@link Schema} */ public Schema build() { - return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields, keySchema, valueSchema); + return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields == null ? null : Collections.unmodifiableList(fields), keySchema, valueSchema); } /** diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java index 05286a1165d..7f54c10c7de 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/source/SourceRecord.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.copycat.connector.CopycatRecord; import org.apache.kafka.copycat.data.Schema; +import java.util.Map; + /** *

* SourceRecords are generated by SourceTasks and passed to Copycat for storage in @@ -41,47 +43,32 @@ import org.apache.kafka.copycat.data.Schema; */ @InterfaceStability.Unstable public class SourceRecord extends CopycatRecord { - private final Schema sourcePartitionSchema; - private final Object sourcePartition; - private final Schema sourceOffsetSchema; - private final Object sourceOffset; + private final Map sourcePartition; + private final Map sourceOffset; - public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition, - Schema sourceOffsetSchema, Object sourceOffset, + public SourceRecord(Map sourcePartition, Map sourceOffset, String topic, Integer partition, Schema valueSchema, Object value) { - this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, partition, null, null, valueSchema, value); + this(sourcePartition, sourceOffset, topic, partition, null, null, valueSchema, value); } - public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition, - Schema sourceOffsetSchema, Object sourceOffset, + public SourceRecord(Map sourcePartition, Map sourceOffset, String topic, Schema valueSchema, Object value) { - this(sourcePartitionSchema, sourcePartition, sourceOffsetSchema, sourceOffset, topic, null, null, null, valueSchema, value); + this(sourcePartition, sourceOffset, topic, null, null, null, valueSchema, value); } - public SourceRecord(Schema sourcePartitionSchema, Object sourcePartition, - Schema sourceOffsetSchema, Object sourceOffset, + public SourceRecord(Map sourcePartition, Map sourceOffset, String topic, Integer partition, Schema keySchema, Object key, Schema valueSchema, Object value) { super(topic, partition, keySchema, key, valueSchema, value); - this.sourcePartitionSchema = sourcePartitionSchema; this.sourcePartition = sourcePartition; - this.sourceOffsetSchema = sourceOffsetSchema; this.sourceOffset = sourceOffset; } - public Schema sourcePartitionSchema() { - return sourcePartitionSchema; - } - - public Object sourcePartition() { + public Map sourcePartition() { return sourcePartition; } - public Schema sourceOffsetSchema() { - return sourceOffsetSchema; - } - - public Object sourceOffset() { + public Map sourceOffset() { return sourceOffset; } @@ -96,10 +83,6 @@ public class SourceRecord extends CopycatRecord { SourceRecord that = (SourceRecord) o; - if (sourcePartitionSchema != null ? !sourcePartitionSchema.equals(that.sourcePartitionSchema) : that.sourcePartitionSchema != null) - return false; - if (sourceOffsetSchema != null ? !sourceOffsetSchema.equals(that.sourceOffsetSchema) : that.sourceOffsetSchema != null) - return false; if (sourcePartition != null ? !sourcePartition.equals(that.sourcePartition) : that.sourcePartition != null) return false; if (sourceOffset != null ? !sourceOffset.equals(that.sourceOffset) : that.sourceOffset != null) @@ -111,8 +94,6 @@ public class SourceRecord extends CopycatRecord { @Override public int hashCode() { int result = super.hashCode(); - result = 31 * result + (sourcePartitionSchema != null ? sourcePartitionSchema.hashCode() : 0); - result = 31 * result + (sourceOffsetSchema != null ? sourceOffsetSchema.hashCode() : 0); result = 31 * result + (sourcePartition != null ? sourcePartition.hashCode() : 0); result = 31 * result + (sourceOffset != null ? sourceOffset.hashCode() : 0); return result; diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java index dd2068d7d44..d51b789db3e 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/Converter.java @@ -21,28 +21,37 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.copycat.data.Schema; import org.apache.kafka.copycat.data.SchemaAndValue; +import java.util.Map; + /** * The Converter interface provides support for translating between Copycat's runtime data format - * and the "native" runtime format used by the serialization layer. This is used to translate - * two types of data: records and offsets. The (de)serialization is performed by a separate - * component -- the producer or consumer serializer or deserializer for records or a Copycat - * serializer or deserializer for offsets. + * and byte[]. Internally, this likely includes an intermediate step to the format used by the serialization + * layer (e.g. JsonNode, GenericRecord, Message). */ @InterfaceStability.Unstable -public interface Converter { +public interface Converter { + + /** + * Configure this class. + * @param configs configs in key/value pairs + * @param isKey whether is for key or value + */ + void configure(Map configs, boolean isKey); /** * Convert a Copycat data object to a native object for serialization. + * @param topic the topic associated with the data * @param schema the schema for the value * @param value the value to convert * @return */ - T fromCopycatData(Schema schema, Object value); + byte[] fromCopycatData(String topic, Schema schema, Object value); /** * Convert a native object to a Copycat data object. + * @param topic the topic associated with the data * @param value the value to convert * @return an object containing the {@link Schema} and the converted value */ - SchemaAndValue toCopycatData(T value); + SchemaAndValue toCopycatData(String topic, byte[] value); } diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java index b51fbdebc55..95d2c04e32d 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReader.java @@ -18,15 +18,20 @@ package org.apache.kafka.copycat.storage; import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.data.SchemaAndValue; import java.util.Collection; import java.util.Map; /** + *

* OffsetStorageReader provides access to the offset storage used by sources. This can be used by * connectors to determine offsets to start consuming data from. This is most commonly used during * initialization of a task, but can also be used during runtime, e.g. when reconfiguring a task. + *

+ *

+ * Offsets are always defined as Maps of Strings to primitive types, i.e. all types supported by + * {@link org.apache.kafka.copycat.data.Schema} other than Array, Map, and Struct. + *

*/ @InterfaceStability.Unstable public interface OffsetStorageReader { @@ -37,12 +42,12 @@ public interface OffsetStorageReader { * @param partition object uniquely identifying the partition of data * @return object uniquely identifying the offset in the partition of data */ - SchemaAndValue offset(SchemaAndValue partition); + Map offset(Map partition); /** *

* Get a set of offsets for the specified partition identifiers. This may be more efficient - * than calling {@link #offset(SchemaAndValue)} repeatedly. + * than calling {@link #offset(Map)} repeatedly. *

*

* Note that when errors occur, this method omits the associated data and tries to return as @@ -56,5 +61,5 @@ public interface OffsetStorageReader { * @param partitions set of identifiers for partitions of data * @return a map of partition identifiers to decoded offsets */ - Map offsets(Collection partitions); + Map, Map> offsets(Collection> partitions); } diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java new file mode 100644 index 00000000000..8d708f81e83 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/storage/StringConverter.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.copycat.data.Schema; +import org.apache.kafka.copycat.data.SchemaAndValue; +import org.apache.kafka.copycat.errors.DataException; + +import java.util.HashMap; +import java.util.Map; + +/** + * {@link Converter} implementation that only supports serializing to strings. When converting Copycat data to bytes, + * the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String. + * When converting from bytes to Copycat format, the converter will only ever return an optional string schema and + * a string or null. + * + * Encoding configuration is identical to {@link StringSerializer} and {@link StringDeserializer}, but for convenience + * this class can also be configured to use the same encoding for both encoding and decoding with the converter.encoding + * setting. + */ +public class StringConverter implements Converter { + private final StringSerializer serializer = new StringSerializer(); + private final StringDeserializer deserializer = new StringDeserializer(); + + public StringConverter() { + } + + @Override + public void configure(Map configs, boolean isKey) { + Map serializerConfigs = new HashMap<>(); + serializerConfigs.putAll(configs); + Map deserializerConfigs = new HashMap<>(); + deserializerConfigs.putAll(configs); + + Object encodingValue = configs.get("converter.encoding"); + if (encodingValue != null) { + serializerConfigs.put("serializer.encoding", encodingValue); + deserializerConfigs.put("deserializer.encoding", encodingValue); + } + + serializer.configure(serializerConfigs, isKey); + deserializer.configure(deserializerConfigs, isKey); + } + + @Override + public byte[] fromCopycatData(String topic, Schema schema, Object value) { + try { + return serializer.serialize(topic, value == null ? null : value.toString()); + } catch (SerializationException e) { + throw new DataException("Failed to serialize to a string: ", e); + } + } + + @Override + public SchemaAndValue toCopycatData(String topic, byte[] value) { + try { + return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, deserializer.deserialize(topic, value)); + } catch (SerializationException e) { + throw new DataException("Failed to deserialize string: ", e); + } + } +} diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java new file mode 100644 index 00000000000..3ea69c1923e --- /dev/null +++ b/copycat/api/src/test/java/org/apache/kafka/copycat/storage/StringConverterTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.copycat.data.Schema; +import org.apache.kafka.copycat.data.SchemaAndValue; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertArrayEquals; + +public class StringConverterTest { + private static final String TOPIC = "topic"; + private static final String SAMPLE_STRING = "a string"; + + private StringConverter converter = new StringConverter(); + + @Test + public void testStringToBytes() throws UnsupportedEncodingException { + assertArrayEquals(SAMPLE_STRING.getBytes("UTF8"), converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING)); + } + + @Test + public void testNonStringToBytes() throws UnsupportedEncodingException { + assertArrayEquals("true".getBytes("UTF8"), converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true)); + } + + @Test + public void testNullToBytes() { + assertEquals(null, converter.fromCopycatData(TOPIC, Schema.OPTIONAL_STRING_SCHEMA, null)); + } + + @Test + public void testToBytesIgnoresSchema() throws UnsupportedEncodingException { + assertArrayEquals("true".getBytes("UTF8"), converter.fromCopycatData(TOPIC, null, true)); + } + + @Test + public void testToBytesNonUtf8Encoding() throws UnsupportedEncodingException { + converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true); + assertArrayEquals(SAMPLE_STRING.getBytes("UTF-16"), converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, SAMPLE_STRING)); + } + + @Test + public void testBytesToString() { + SchemaAndValue data = converter.toCopycatData(TOPIC, SAMPLE_STRING.getBytes()); + assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema()); + assertEquals(SAMPLE_STRING, data.value()); + } + + @Test + public void testBytesNullToString() { + SchemaAndValue data = converter.toCopycatData(TOPIC, null); + assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema()); + assertEquals(null, data.value()); + } + + @Test + public void testBytesToStringNonUtf8Encoding() throws UnsupportedEncodingException { + converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true); + SchemaAndValue data = converter.toCopycatData(TOPIC, SAMPLE_STRING.getBytes("UTF-16")); + assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema()); + assertEquals(SAMPLE_STRING, data.value()); + } +} diff --git a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java index a841386ea59..91292e9ceb1 100644 --- a/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java +++ b/copycat/file/src/main/java/org/apache/kafka/copycat/file/FileStreamSourceTask.java @@ -18,7 +18,6 @@ package org.apache.kafka.copycat.file; import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceTask; @@ -26,17 +25,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; +import java.util.*; /** * FileStreamSourceTask reads from stdin or a file. */ public class FileStreamSourceTask extends SourceTask { private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class); - private static final Schema OFFSET_KEY_SCHEMA = Schema.STRING_SCHEMA; - private static final Schema OFFSET_VALUE_SCHEMA = Schema.OPTIONAL_INT64_SCHEMA; + public static final String FILENAME_FIELD = "filename"; + public static final String POSITION_FIELD = "position"; private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA; private String filename; @@ -66,14 +63,14 @@ public class FileStreamSourceTask extends SourceTask { if (stream == null) { try { stream = new FileInputStream(filename); - SchemaAndValue offsetWithSchema = context.offsetStorageReader().offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, filename)); - if (offsetWithSchema != null) { - if (!offsetWithSchema.schema().equals(OFFSET_VALUE_SCHEMA)) - throw new CopycatException("Unexpected offset schema."); - Long lastRecordedOffset = (Long) offsetWithSchema.value(); + Map offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename)); + if (offset != null) { + Object lastRecordedOffset = offset.get(POSITION_FIELD); + if (lastRecordedOffset != null && !(lastRecordedOffset instanceof Long)) + throw new CopycatException("Offset position is the incorrect type"); if (lastRecordedOffset != null) { log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset); - long skipLeft = lastRecordedOffset; + long skipLeft = (Long) lastRecordedOffset; while (skipLeft > 0) { try { long skipped = stream.skip(skipLeft); @@ -85,7 +82,7 @@ public class FileStreamSourceTask extends SourceTask { } log.debug("Skipped to offset {}", lastRecordedOffset); } - streamOffset = (lastRecordedOffset != null) ? lastRecordedOffset : 0L; + streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L; } else { streamOffset = 0L; } @@ -130,7 +127,7 @@ public class FileStreamSourceTask extends SourceTask { if (line != null) { if (records == null) records = new ArrayList<>(); - records.add(new SourceRecord(OFFSET_KEY_SCHEMA, filename, OFFSET_VALUE_SCHEMA, streamOffset, topic, VALUE_SCHEMA, line)); + records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line)); } new ArrayList(); } while (line != null); @@ -193,4 +190,12 @@ public class FileStreamSourceTask extends SourceTask { this.notify(); } } + + private Map offsetKey(String filename) { + return Collections.singletonMap(FILENAME_FIELD, filename); + } + + private Map offsetValue(Long pos) { + return Collections.singletonMap(POSITION_FIELD, pos); + } } diff --git a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java index ab89b6a2e6b..d2781c963bc 100644 --- a/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java +++ b/copycat/file/src/test/java/org/apache/kafka/copycat/file/FileStreamSourceTaskTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.copycat.file; -import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceTaskContext; @@ -31,7 +30,9 @@ import org.powermock.api.easymock.PowerMock; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import static org.junit.Assert.assertEquals; @@ -89,7 +90,8 @@ public class FileStreamSourceTaskTest { assertEquals(1, records.size()); assertEquals(TOPIC, records.get(0).topic()); assertEquals("partial line finished", records.get(0).value()); - assertEquals(22L, records.get(0).sourceOffset()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 22L), records.get(0).sourceOffset()); assertEquals(null, task.poll()); // Different line endings, and make sure the final \r doesn't result in a line until we can @@ -99,20 +101,25 @@ public class FileStreamSourceTaskTest { records = task.poll(); assertEquals(4, records.size()); assertEquals("line1", records.get(0).value()); - assertEquals(28L, records.get(0).sourceOffset()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 28L), records.get(0).sourceOffset()); assertEquals("line2", records.get(1).value()); - assertEquals(35L, records.get(1).sourceOffset()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(1).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 35L), records.get(1).sourceOffset()); assertEquals("line3", records.get(2).value()); - assertEquals(41L, records.get(2).sourceOffset()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(2).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 41L), records.get(2).sourceOffset()); assertEquals("line4", records.get(3).value()); - assertEquals(47L, records.get(3).sourceOffset()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(3).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 47L), records.get(3).sourceOffset()); os.write("subsequent text".getBytes()); os.flush(); records = task.poll(); assertEquals(1, records.size()); assertEquals("", records.get(0).value()); - assertEquals(48L, records.get(0).sourceOffset()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.FILENAME_FIELD, tempFile.getAbsolutePath()), records.get(0).sourcePartition()); + assertEquals(Collections.singletonMap(FileStreamSourceTask.POSITION_FIELD, 48L), records.get(0).sourceOffset()); task.stop(); } @@ -135,6 +142,6 @@ public class FileStreamSourceTaskTest { private void expectOffsetLookupReturnNone() { - EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(SchemaAndValue.class))).andReturn(null); + EasyMock.expect(offsetStorageReader.offset(EasyMock.anyObject(Map.class))).andReturn(null); } } \ No newline at end of file diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java index 67df11d4b81..1841640196d 100644 --- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java +++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.copycat.data.*; import org.apache.kafka.copycat.errors.DataException; import org.apache.kafka.copycat.storage.Converter; @@ -32,7 +33,9 @@ import java.util.*; /** * Implementation of Converter that uses JSON to store schemas and objects. */ -public class JsonConverter implements Converter { +public class JsonConverter implements Converter { + private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable"; + private static final boolean SCHEMAS_ENABLE_DEFAULT = true; private static final HashMap TO_COPYCAT_CONVERTERS = new HashMap<>(); @@ -117,9 +120,7 @@ public class JsonConverter implements Converter { public Object convert(Schema schema, JsonNode value) { if (value.isNull()) return checkOptionalAndDefault(schema); - Schema elemSchema = schema.valueSchema(); - if (elemSchema == null) - throw new DataException("Array schema did not specify the element type"); + Schema elemSchema = schema == null ? null : schema.valueSchema(); ArrayList result = new ArrayList<>(); for (JsonNode elem : value) { result.add(convertToCopycat(elemSchema, elem)); @@ -132,13 +133,14 @@ public class JsonConverter implements Converter { public Object convert(Schema schema, JsonNode value) { if (value.isNull()) return checkOptionalAndDefault(schema); - Schema keySchema = schema.keySchema(); - Schema valueSchema = schema.valueSchema(); + Schema keySchema = schema == null ? null : schema.keySchema(); + Schema valueSchema = schema == null ? null : schema.valueSchema(); // If the map uses strings for keys, it should be encoded in the natural JSON format. If it uses other - // primitive types or a complex type as a key, it will be encoded as a list of pairs + // primitive types or a complex type as a key, it will be encoded as a list of pairs. If we don't have a + // schema, we default to encoding in a Map. Map result = new HashMap<>(); - if (keySchema.type() == Schema.Type.STRING) { + if (schema == null || keySchema.type() == Schema.Type.STRING) { if (!value.isObject()) throw new DataException("Map's with string fields should be encoded as JSON objects, but found " + value.getNodeType()); Iterator> fieldIt = value.fields(); @@ -182,24 +184,73 @@ public class JsonConverter implements Converter { } }); + + } + + private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT; + + private final JsonSerializer serializer = new JsonSerializer(); + private final JsonDeserializer deserializer = new JsonDeserializer(); + + @Override + public void configure(Map configs, boolean isKey) { + Object enableConfigsVal = configs.get(SCHEMAS_ENABLE_CONFIG); + if (enableConfigsVal != null) + enableSchemas = enableConfigsVal.toString().equals("true"); + + serializer.configure(configs, isKey); + deserializer.configure(configs, isKey); } @Override - public JsonNode fromCopycatData(Schema schema, Object value) { - return convertToJsonWithSchemaEnvelope(schema, value); + public byte[] fromCopycatData(String topic, Schema schema, Object value) { + JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value); + try { + return serializer.serialize(topic, jsonValue); + } catch (SerializationException e) { + throw new DataException("Converting Copycat data to byte[] failed due to serialization error: ", e); + } } @Override - public SchemaAndValue toCopycatData(JsonNode value) { - if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) + public SchemaAndValue toCopycatData(String topic, byte[] value) { + JsonNode jsonValue; + try { + jsonValue = deserializer.deserialize(topic, value); + } catch (SerializationException e) { + throw new DataException("Converting byte[] to Copycat data failed due to serialization error: ", e); + } + + if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload"))) + throw new DataException("JsonDeserializer with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields"); + + // The deserialized data should either be an envelope object containing the schema and the payload or the schema + // was stripped during serialization and we need to fill in an all-encompassing schema. + if (!enableSchemas) { + ObjectNode envelope = JsonNodeFactory.instance.objectNode(); + envelope.set("schema", null); + envelope.set("payload", jsonValue); + jsonValue = envelope; + } + + return jsonToCopycat(jsonValue); + } + + private SchemaAndValue jsonToCopycat(JsonNode jsonValue) { + if (jsonValue == null) + return SchemaAndValue.NULL; + + if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) throw new DataException("JSON value converted to Copycat must be in envelope containing schema"); - Schema schema = asCopycatSchema(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - return new SchemaAndValue(schema, convertToCopycat(schema, value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))); + Schema schema = asCopycatSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + return new SchemaAndValue(schema, convertToCopycat(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))); } - private static ObjectNode asJsonSchema(Schema schema) { + if (schema == null) + return null; + final ObjectNode jsonSchema; switch (schema.type()) { case BOOLEAN: @@ -369,16 +420,22 @@ public class JsonConverter implements Converter { * @param value the value * @return JsonNode-encoded version */ - private static JsonNode convertToJsonWithSchemaEnvelope(Schema schema, Object value) { + private static JsonNode convertToJsonWithEnvelope(Schema schema, Object value) { return new JsonSchema.Envelope(asJsonSchema(schema), convertToJson(schema, value)).toJsonNode(); } + private static JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) { + return convertToJson(schema, value); + } + /** * Convert this object, in the org.apache.kafka.copycat.data format, into a JSON object, returning both the schema * and the converted object. */ private static JsonNode convertToJson(Schema schema, Object value) { if (value == null) { + if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema + return null; if (schema.defaultValue() != null) return convertToJson(schema, schema.defaultValue()); if (schema.isOptional()) @@ -386,85 +443,141 @@ public class JsonConverter implements Converter { throw new DataException("Conversion error: null value for field that is required and has no default value"); } - switch (schema.type()) { - case INT8: - return JsonNodeFactory.instance.numberNode((Byte) value); - case INT16: - return JsonNodeFactory.instance.numberNode((Short) value); - case INT32: - return JsonNodeFactory.instance.numberNode((Integer) value); - case INT64: - return JsonNodeFactory.instance.numberNode((Long) value); - case FLOAT32: - return JsonNodeFactory.instance.numberNode((Float) value); - case FLOAT64: - return JsonNodeFactory.instance.numberNode((Double) value); - case BOOLEAN: - return JsonNodeFactory.instance.booleanNode((Boolean) value); - case STRING: - CharSequence charSeq = (CharSequence) value; - return JsonNodeFactory.instance.textNode(charSeq.toString()); - case BYTES: - if (value instanceof byte[]) - return JsonNodeFactory.instance.binaryNode((byte[]) value); - else if (value instanceof ByteBuffer) - return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array()); - else - throw new DataException("Invalid type for bytes type: " + value.getClass()); - case ARRAY: { - if (!(value instanceof Collection)) - throw new DataException("Invalid type for array type: " + value.getClass()); - Collection collection = (Collection) value; - ArrayNode list = JsonNodeFactory.instance.arrayNode(); - for (Object elem : collection) { - JsonNode fieldValue = convertToJson(schema.valueSchema(), elem); - list.add(fieldValue); - } - return list; + try { + final Schema.Type schemaType; + if (schema == null) { + schemaType = CopycatSchema.schemaType(value.getClass()); + if (schemaType == null) + throw new DataException("Java class " + value.getClass() + " does not have corresponding schema type."); + } else { + schemaType = schema.type(); } - case MAP: { - if (!(value instanceof Map)) - throw new DataException("Invalid type for array type: " + value.getClass()); - Map map = (Map) value; - // If true, using string keys and JSON object; if false, using non-string keys and Array-encoding - boolean objectMode = schema.keySchema().type() == Schema.Type.STRING; - ObjectNode obj = null; - ArrayNode list = null; - if (objectMode) - obj = JsonNodeFactory.instance.objectNode(); - else - list = JsonNodeFactory.instance.arrayNode(); - for (Map.Entry entry : map.entrySet()) { - JsonNode mapKey = convertToJson(schema.keySchema(), entry.getKey()); - JsonNode mapValue = convertToJson(schema.valueSchema(), entry.getValue()); - - if (objectMode) - obj.set(mapKey.asText(), mapValue); + switch (schemaType) { + case INT8: + return JsonNodeFactory.instance.numberNode((Byte) value); + case INT16: + return JsonNodeFactory.instance.numberNode((Short) value); + case INT32: + return JsonNodeFactory.instance.numberNode((Integer) value); + case INT64: + return JsonNodeFactory.instance.numberNode((Long) value); + case FLOAT32: + return JsonNodeFactory.instance.numberNode((Float) value); + case FLOAT64: + return JsonNodeFactory.instance.numberNode((Double) value); + case BOOLEAN: + return JsonNodeFactory.instance.booleanNode((Boolean) value); + case STRING: + CharSequence charSeq = (CharSequence) value; + return JsonNodeFactory.instance.textNode(charSeq.toString()); + case BYTES: + if (value instanceof byte[]) + return JsonNodeFactory.instance.binaryNode((byte[]) value); + else if (value instanceof ByteBuffer) + return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array()); else - list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue)); + throw new DataException("Invalid type for bytes type: " + value.getClass()); + case ARRAY: { + Collection collection = (Collection) value; + ArrayNode list = JsonNodeFactory.instance.arrayNode(); + for (Object elem : collection) { + Schema valueSchema = schema == null ? null : schema.valueSchema(); + JsonNode fieldValue = convertToJson(valueSchema, elem); + list.add(fieldValue); + } + return list; } - return objectMode ? obj : list; - } - case STRUCT: { - if (!(value instanceof Struct)) - throw new DataException("Invalid type for struct type: " + value.getClass()); - Struct struct = (Struct) value; - if (struct.schema() != schema) - throw new DataException("Mismatching schema."); - ObjectNode obj = JsonNodeFactory.instance.objectNode(); - for (Field field : schema.fields()) { - obj.set(field.name(), convertToJson(field.schema(), struct.get(field))); - } - return obj; - } - } + case MAP: { + Map map = (Map) value; + // If true, using string keys and JSON object; if false, using non-string keys and Array-encoding + boolean objectMode; + if (schema == null) { + objectMode = true; + for (Map.Entry entry : map.entrySet()) { + if (!(entry.getKey() instanceof String)) { + objectMode = false; + break; + } + } + } else { + objectMode = schema.keySchema().type() == Schema.Type.STRING; + } + ObjectNode obj = null; + ArrayNode list = null; + if (objectMode) + obj = JsonNodeFactory.instance.objectNode(); + else + list = JsonNodeFactory.instance.arrayNode(); + for (Map.Entry entry : map.entrySet()) { + Schema keySchema = schema == null ? null : schema.keySchema(); + Schema valueSchema = schema == null ? null : schema.valueSchema(); + JsonNode mapKey = convertToJson(keySchema, entry.getKey()); + JsonNode mapValue = convertToJson(valueSchema, entry.getValue()); - throw new DataException("Couldn't convert " + value + " to JSON."); + if (objectMode) + obj.set(mapKey.asText(), mapValue); + else + list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue)); + } + return objectMode ? obj : list; + } + case STRUCT: { + Struct struct = (Struct) value; + if (struct.schema() != schema) + throw new DataException("Mismatching schema."); + ObjectNode obj = JsonNodeFactory.instance.objectNode(); + for (Field field : schema.fields()) { + obj.set(field.name(), convertToJson(field.schema(), struct.get(field))); + } + return obj; + } + } + + throw new DataException("Couldn't convert " + value + " to JSON."); + } catch (ClassCastException e) { + throw new DataException("Invalid type for " + schema.type() + ": " + value.getClass()); + } } private static Object convertToCopycat(Schema schema, JsonNode jsonValue) { - JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schema.type()); + JsonToCopycatTypeConverter typeConverter; + final Schema.Type schemaType; + if (schema != null) { + schemaType = schema.type(); + } else { + switch (jsonValue.getNodeType()) { + case NULL: + // Special case. With no schema + return null; + case BOOLEAN: + schemaType = Schema.Type.BOOLEAN; + break; + case NUMBER: + if (jsonValue.isIntegralNumber()) + schemaType = Schema.Type.INT64; + else + schemaType = Schema.Type.FLOAT64; + break; + case ARRAY: + schemaType = Schema.Type.ARRAY; + break; + case OBJECT: + schemaType = Schema.Type.MAP; + break; + case STRING: + schemaType = Schema.Type.STRING; + break; + + case BINARY: + case MISSING: + case POJO: + default: + schemaType = null; + break; + } + } + typeConverter = TO_COPYCAT_CONVERTERS.get(schemaType); if (typeConverter == null) throw new DataException("Unknown schema type: " + schema.type()); diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java index 29c7bac12ec..16617544086 100644 --- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java +++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java @@ -18,9 +18,6 @@ package org.apache.kafka.copycat.json; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; @@ -31,22 +28,6 @@ import java.util.Map; * structured data without having associated Java classes. This deserializer also supports Copycat schemas. */ public class JsonDeserializer implements Deserializer { - private static final ObjectNode CATCH_ALL_OBJECT_SCHEMA = JsonNodeFactory.instance.objectNode(); - private static final ObjectNode CATCH_ALL_ARRAY_SCHEMA = JsonNodeFactory.instance.objectNode(); - private static final ArrayNode ALL_SCHEMAS_LIST = JsonNodeFactory.instance.arrayNode(); - private static final ObjectNode CATCH_ALL_SCHEMA = JsonNodeFactory.instance.objectNode(); - static { - CATCH_ALL_OBJECT_SCHEMA.put("type", "object") - .putArray("field").add(JsonNodeFactory.instance.objectNode().put("*", "all")); - - CATCH_ALL_ARRAY_SCHEMA.put("type", "array").put("items", "all"); - - ALL_SCHEMAS_LIST.add("boolean").add("int").add("long").add("float").add("double").add("bytes").add("string") - .add(CATCH_ALL_ARRAY_SCHEMA).add(CATCH_ALL_OBJECT_SCHEMA); - - CATCH_ALL_SCHEMA.put("name", "all").set("type", ALL_SCHEMAS_LIST); - } - private ObjectMapper objectMapper = new ObjectMapper(); /** @@ -61,6 +42,9 @@ public class JsonDeserializer implements Deserializer { @Override public JsonNode deserialize(String topic, byte[] bytes) { + if (bytes == null) + return null; + JsonNode data; try { data = objectMapper.readTree(bytes); @@ -68,15 +52,6 @@ public class JsonDeserializer implements Deserializer { throw new SerializationException(e); } - // The deserialized data should either be an envelope object containing the schema and the payload or the schema - // was stripped during serialization and we need to fill in an all-encompassing schema. - if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload")) { - ObjectNode envelope = JsonNodeFactory.instance.objectNode(); - envelope.set("schema", CATCH_ALL_SCHEMA); - envelope.set("payload", data); - data = envelope; - } - return data; } diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java index 80df6be0e92..129d14b3d28 100644 --- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java +++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java @@ -28,12 +28,7 @@ import java.util.Map; * structured data without corresponding Java classes. This serializer also supports Copycat schemas. */ public class JsonSerializer implements Serializer { - - private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable"; - private static final boolean SCHEMAS_ENABLE_DEFAULT = true; - private final ObjectMapper objectMapper = new ObjectMapper(); - private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT; /** * Default constructor needed by Kafka @@ -44,9 +39,6 @@ public class JsonSerializer implements Serializer { @Override public void configure(Map config, boolean isKey) { - Object enableConfigsVal = config.get(SCHEMAS_ENABLE_CONFIG); - if (enableConfigsVal != null) - enableSchemas = enableConfigsVal.toString().equals("true"); } @Override @@ -54,14 +46,7 @@ public class JsonSerializer implements Serializer { if (data == null) return null; - // This serializer works for Copycat data that requires a schema to be included, so we expect it to have a - // specific format: { "schema": {...}, "payload": ... }. - if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload")) - throw new SerializationException("JsonSerializer requires \"schema\" and \"payload\" fields and may not contain additional fields"); - try { - if (!enableSchemas) - data = data.get("payload"); return objectMapper.writeValueAsBytes(data); } catch (Exception e) { throw new SerializationException("Error serializing JSON message", e); diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java index ab4a86ef513..214f9ce0833 100644 --- a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java +++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java @@ -19,19 +19,19 @@ package org.apache.kafka.copycat.json; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import org.apache.kafka.copycat.data.Schema; import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.data.SchemaBuilder; import org.apache.kafka.copycat.data.Struct; +import org.apache.kafka.copycat.errors.DataException; import org.junit.Test; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -39,6 +39,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class JsonConverterTest { + private static final String TOPIC = "topic"; ObjectMapper objectMapper = new ObjectMapper(); JsonConverter converter = new JsonConverter(); @@ -48,51 +49,51 @@ public class JsonConverterTest { @Test public void testCopycatSchemaMetadataTranslation() { // this validates the non-type fields are translated and handled properly - assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }"))); - assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }"))); + assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes())); + assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }".getBytes())); assertEquals(new SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true), - converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }"))); + converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }".getBytes())); assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").build(), true), - converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\"}, \"payload\": true }"))); + converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\"}, \"payload\": true }".getBytes())); } // Schema types @Test public void booleanToCopycat() { - assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }"))); - assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }"))); + assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes())); + assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }".getBytes())); } @Test public void byteToCopycat() { - assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }"))); + assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }".getBytes())); } @Test public void shortToCopycat() { - assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }"))); + assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }".getBytes())); } @Test public void intToCopycat() { - assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }"))); + assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }".getBytes())); } @Test public void longToCopycat() { - assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }"))); - assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }"))); + assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }".getBytes())); + assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }".getBytes())); } @Test public void floatToCopycat() { - assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }"))); + assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }".getBytes())); } @Test public void doubleToCopycat() { - assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }"))); + assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }".getBytes())); } @@ -100,69 +101,105 @@ public class JsonConverterTest { public void bytesToCopycat() throws UnsupportedEncodingException { ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8")); String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }"; - SchemaAndValue schemaAndValue = converter.toCopycatData(parse(msg)); + SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes()); ByteBuffer converted = ByteBuffer.wrap((byte[]) schemaAndValue.value()); assertEquals(reference, converted); } @Test public void stringToCopycat() { - assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }"))); + assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes())); } @Test public void arrayToCopycat() { - JsonNode arrayJson = parse("{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }"); - assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toCopycatData(arrayJson)); + byte[] arrayJson = "{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }".getBytes(); + assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toCopycatData(TOPIC, arrayJson)); } @Test public void mapToCopycatStringKeys() { - JsonNode mapJson = parse("{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }"); + byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }".getBytes(); Map expected = new HashMap<>(); expected.put("key1", 12); expected.put("key2", 15); - assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(mapJson)); + assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, mapJson)); } @Test public void mapToCopycatNonStringKeys() { - JsonNode mapJson = parse("{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }"); + byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }".getBytes(); Map expected = new HashMap<>(); expected.put(1, 12); expected.put(2, 15); - assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(mapJson)); + assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, mapJson)); } @Test public void structToCopycat() { - JsonNode structJson = parse("{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }"); + byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }".getBytes(); Schema expectedSchema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build(); Struct expected = new Struct(expectedSchema).put("field1", true).put("field2", "string"); - SchemaAndValue converted = converter.toCopycatData(structJson); + SchemaAndValue converted = converter.toCopycatData(TOPIC, structJson); assertEquals(new SchemaAndValue(expectedSchema, expected), converted); } + @Test(expected = DataException.class) + public void nullToCopycat() { + // When schemas are enabled, trying to decode a null should be an error -- we should *always* have the envelope + assertEquals(SchemaAndValue.NULL, converter.toCopycatData(TOPIC, null)); + } + + @Test + public void nullSchemaPrimitiveToCopycat() { + SchemaAndValue converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes()); + assertEquals(SchemaAndValue.NULL, converted); + + converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": true }".getBytes()); + assertEquals(new SchemaAndValue(null, true), converted); + + // Integers: Copycat has more data types, and JSON unfortunately mixes all number types. We try to preserve + // info as best we can, so we always use the largest integer and floating point numbers we can and have Jackson + // determine if it's an integer or not + converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": 12 }".getBytes()); + assertEquals(new SchemaAndValue(null, 12L), converted); + + converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": 12.24 }".getBytes()); + assertEquals(new SchemaAndValue(null, 12.24), converted); + + converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": \"a string\" }".getBytes()); + assertEquals(new SchemaAndValue(null, "a string"), converted); + + converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": [1, \"2\", 3] }".getBytes()); + assertEquals(new SchemaAndValue(null, Arrays.asList(1L, "2", 3L)), converted); + + converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": { \"field1\": 1, \"field2\": 2} }".getBytes()); + Map obj = new HashMap<>(); + obj.put("field1", 1L); + obj.put("field2", 2L); + assertEquals(new SchemaAndValue(null, obj), converted); + } + // Schema metadata @Test public void testJsonSchemaMetadataTranslation() { - JsonNode converted = converter.fromCopycatData(Schema.BOOLEAN_SCHEMA, true); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); - converted = converter.fromCopycatData(Schema.OPTIONAL_BOOLEAN_SCHEMA, null); + converted = parse(converter.fromCopycatData(TOPIC, Schema.OPTIONAL_BOOLEAN_SCHEMA, null)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"boolean\", \"optional\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNull()); - converted = converter.fromCopycatData(SchemaBuilder.bool().defaultValue(true).build(), true); + converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().defaultValue(true).build(), true)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"default\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); - converted = converter.fromCopycatData(SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").build(), true); + converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").build(), true)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 3, \"doc\": \"the documentation\"}"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); @@ -173,7 +210,7 @@ public class JsonConverterTest { @Test public void booleanToJson() { - JsonNode converted = converter.fromCopycatData(Schema.BOOLEAN_SCHEMA, true); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); @@ -181,7 +218,7 @@ public class JsonConverterTest { @Test public void byteToJson() { - JsonNode converted = converter.fromCopycatData(Schema.INT8_SCHEMA, (byte) 12); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT8_SCHEMA, (byte) 12)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"int8\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue()); @@ -189,7 +226,7 @@ public class JsonConverterTest { @Test public void shortToJson() { - JsonNode converted = converter.fromCopycatData(Schema.INT16_SCHEMA, (short) 12); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT16_SCHEMA, (short) 12)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"int16\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue()); @@ -197,7 +234,7 @@ public class JsonConverterTest { @Test public void intToJson() { - JsonNode converted = converter.fromCopycatData(Schema.INT32_SCHEMA, 12); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT32_SCHEMA, 12)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"int32\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue()); @@ -205,7 +242,7 @@ public class JsonConverterTest { @Test public void longToJson() { - JsonNode converted = converter.fromCopycatData(Schema.INT64_SCHEMA, 4398046511104L); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT64_SCHEMA, 4398046511104L)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"int64\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(4398046511104L, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue()); @@ -213,7 +250,7 @@ public class JsonConverterTest { @Test public void floatToJson() { - JsonNode converted = converter.fromCopycatData(Schema.FLOAT32_SCHEMA, 12.34f); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.FLOAT32_SCHEMA, 12.34f)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"float\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(12.34f, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001); @@ -221,7 +258,7 @@ public class JsonConverterTest { @Test public void doubleToJson() { - JsonNode converted = converter.fromCopycatData(Schema.FLOAT64_SCHEMA, 12.34); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, 12.34)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"double\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(12.34, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001); @@ -229,7 +266,7 @@ public class JsonConverterTest { @Test public void bytesToJson() throws IOException { - JsonNode converted = converter.fromCopycatData(Schema.BYTES_SCHEMA, "test-string".getBytes()); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BYTES_SCHEMA, "test-string".getBytes())); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals(ByteBuffer.wrap("test-string".getBytes()), @@ -238,7 +275,7 @@ public class JsonConverterTest { @Test public void stringToJson() { - JsonNode converted = converter.fromCopycatData(Schema.STRING_SCHEMA, "test-string"); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, "test-string")); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"string\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue()); @@ -247,7 +284,7 @@ public class JsonConverterTest { @Test public void arrayToJson() { Schema int32Array = SchemaBuilder.array(Schema.INT32_SCHEMA).build(); - JsonNode converted = converter.fromCopycatData(int32Array, Arrays.asList(1, 2, 3)); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, int32Array, Arrays.asList(1, 2, 3))); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int32\", \"optional\": false }, \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); @@ -261,7 +298,7 @@ public class JsonConverterTest { Map input = new HashMap<>(); input.put("key1", 12); input.put("key2", 15); - JsonNode converted = converter.fromCopycatData(stringIntMap, input); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, stringIntMap, input)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"string\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); @@ -275,21 +312,28 @@ public class JsonConverterTest { Map input = new HashMap<>(); input.put(1, 12); input.put(2, 15); - JsonNode converted = converter.fromCopycatData(intIntMap, input); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, intIntMap, input)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"int32\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(JsonNodeFactory.instance.arrayNode() - .add(JsonNodeFactory.instance.arrayNode().add(1).add(12)) - .add(JsonNodeFactory.instance.arrayNode().add(2).add(15)), - converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); + + assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray()); + ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME); + assertEquals(2, payload.size()); + Set payloadEntries = new HashSet<>(); + for (JsonNode elem : payload) + payloadEntries.add(elem); + assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add(1).add(12), + JsonNodeFactory.instance.arrayNode().add(2).add(15))), + payloadEntries + ); } @Test public void structToJson() { Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build(); Struct input = new Struct(schema).put("field1", true).put("field2", "string"); - JsonNode converted = converter.fromCopycatData(schema, input); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, schema, input)); validateEnvelope(converted); assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }] }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); @@ -299,6 +343,102 @@ public class JsonConverterTest { converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); } + + @Test + public void nullSchemaAndPrimitiveToJson() { + // This still needs to do conversion of data, null schema means "anything goes" + JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true)); + validateEnvelopeNullSchema(converted); + assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull()); + assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); + } + + @Test + public void nullSchemaAndArrayToJson() { + // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match + // types to verify conversion still works. + JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, Arrays.asList(1, "string", true))); + validateEnvelopeNullSchema(converted); + assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull()); + assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add("string").add(true), + converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); + } + + @Test + public void nullSchemaAndMapToJson() { + // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match + // types to verify conversion still works. + Map input = new HashMap<>(); + input.put("key1", 12); + input.put("key2", "string"); + input.put("key3", true); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, input)); + validateEnvelopeNullSchema(converted); + assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull()); + assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 12).put("key2", "string").put("key3", true), + converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); + } + + @Test + public void nullSchemaAndMapNonStringKeysToJson() { + // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match + // types to verify conversion still works. + Map input = new HashMap<>(); + input.put("string", 12); + input.put(52, "string"); + input.put(false, true); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, input)); + validateEnvelopeNullSchema(converted); + assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull()); + assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray()); + ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME); + assertEquals(3, payload.size()); + Set payloadEntries = new HashSet<>(); + for (JsonNode elem : payload) + payloadEntries.add(elem); + assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add("string").add(12), + JsonNodeFactory.instance.arrayNode().add(52).add("string"), + JsonNodeFactory.instance.arrayNode().add(false).add(true))), + payloadEntries + ); + } + + + @Test(expected = DataException.class) + public void mismatchSchemaJson() { + // If we have mismatching schema info, we should properly convert to a DataException + converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, true); + } + + + + @Test + public void noSchemaToCopycat() { + Map props = Collections.singletonMap("schemas.enable", false); + converter.configure(props, true); + assertEquals(new SchemaAndValue(null, true), converter.toCopycatData(TOPIC, "true".getBytes())); + } + + @Test + public void noSchemaToJson() { + Map props = Collections.singletonMap("schemas.enable", false); + converter.configure(props, true); + JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true)); + assertTrue(converted.isBoolean()); + assertEquals(true, converted.booleanValue()); + } + + + + private JsonNode parse(byte[] json) { + try { + return objectMapper.readTree(json); + } catch (IOException e) { + fail("IOException during JSON parse: " + e.getMessage()); + throw new RuntimeException("failed"); + } + } + private JsonNode parse(String json) { try { return objectMapper.readTree(json); @@ -316,4 +456,13 @@ public class JsonConverterTest { assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isObject()); assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); } + + private void validateEnvelopeNullSchema(JsonNode env) { + assertNotNull(env); + assertTrue(env.isObject()); + assertEquals(2, env.size()); + assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull()); + assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); + } } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java index 23cdf4dfd51..a976d9059c9 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java @@ -32,8 +32,7 @@ import java.util.Properties; public class WorkerConfig extends AbstractConfig { public static final String CLUSTER_CONFIG = "cluster"; - private static final String - CLUSTER_CONFIG_DOC = + private static final String CLUSTER_CONFIG_DOC = "ID for this cluster, which is used to provide a namespace so multiple Copycat clusters " + "or instances may co-exist while sharing a single Kafka cluster."; public static final String CLUSTER_DEFAULT = "copycat"; @@ -58,21 +57,13 @@ public class WorkerConfig extends AbstractConfig { public static final String VALUE_CONVERTER_CLASS_DOC = "Converter class for value Copycat data that implements the Converter interface."; - public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; - public static final String KEY_SERIALIZER_CLASS_DOC = - "Serializer class for key that implements the Serializer interface."; + public static final String OFFSET_KEY_CONVERTER_CLASS_CONFIG = "offset.key.converter"; + public static final String OFFSET_KEY_CONVERTER_CLASS_DOC = + "Converter class for offset key Copycat data that implements the Converter interface."; - public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; - public static final String VALUE_SERIALIZER_CLASS_DOC = - "Serializer class for value that implements the Serializer interface."; - - public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"; - public static final String KEY_DESERIALIZER_CLASS_DOC = - "Serializer class for key that implements the Deserializer interface."; - - public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; - public static final String VALUE_DESERIALIZER_CLASS_DOC = - "Deserializer class for value that implements the Deserializer interface."; + public static final String OFFSET_VALUE_CONVERTER_CLASS_CONFIG = "offset.value.converter"; + public static final String OFFSET_VALUE_CONVERTER_CLASS_DOC = + "Converter class for offset value Copycat data that implements the Converter interface."; public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG = "task.shutdown.graceful.timeout.ms"; @@ -104,14 +95,10 @@ public class WorkerConfig extends AbstractConfig { Importance.HIGH, KEY_CONVERTER_CLASS_DOC) .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_CONVERTER_CLASS_DOC) - .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS, - Importance.HIGH, KEY_SERIALIZER_CLASS_DOC) - .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS, - Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC) - .define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS, - Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC) - .define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS, - Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC) + .define(OFFSET_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, OFFSET_KEY_CONVERTER_CLASS_DOC) + .define(OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, OFFSET_VALUE_CONVERTER_CLASS_DOC) .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC) diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java index 704470a4b13..6cbce0b251e 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java @@ -17,8 +17,6 @@ package org.apache.kafka.copycat.runtime; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.clients.producer.KafkaProducer; @@ -47,34 +45,36 @@ import java.util.Properties; * Since each task has a dedicated thread, this is mainly just a container for them. *

*/ -public class Worker { +public class Worker { private static final Logger log = LoggerFactory.getLogger(Worker.class); private Time time; private WorkerConfig config; - private Converter keyConverter; - private Converter valueConverter; + private Converter keyConverter; + private Converter valueConverter; + private Converter offsetKeyConverter; + private Converter offsetValueConverter; private OffsetBackingStore offsetBackingStore; - private Serializer offsetKeySerializer; - private Serializer offsetValueSerializer; - private Deserializer offsetKeyDeserializer; - private Deserializer offsetValueDeserializer; private HashMap tasks = new HashMap<>(); - private KafkaProducer producer; + private KafkaProducer producer; private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; public Worker(WorkerConfig config) { - this(new SystemTime(), config, null, null, null, null, null); + this(new SystemTime(), config, null); } @SuppressWarnings("unchecked") - public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore, - Serializer offsetKeySerializer, Serializer offsetValueSerializer, - Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) { + public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) { this.time = time; this.config = config; this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class); + this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true); this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); + this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false); + this.offsetKeyConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_KEY_CONVERTER_CLASS_CONFIG, Converter.class); + this.offsetKeyConverter.configure(config.originalsWithPrefix("offset.key.converter."), true); + this.offsetValueConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Converter.class); + this.offsetValueConverter.configure(config.originalsWithPrefix("offset.value.converter."), false); if (offsetBackingStore != null) { this.offsetBackingStore = offsetBackingStore; @@ -82,34 +82,6 @@ public class Worker { this.offsetBackingStore = new FileOffsetBackingStore(); this.offsetBackingStore.configure(config.originals()); } - - if (offsetKeySerializer != null) { - this.offsetKeySerializer = offsetKeySerializer; - } else { - this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); - this.offsetKeySerializer.configure(config.originals(), true); - } - - if (offsetValueSerializer != null) { - this.offsetValueSerializer = offsetValueSerializer; - } else { - this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); - this.offsetValueSerializer.configure(config.originals(), false); - } - - if (offsetKeyDeserializer != null) { - this.offsetKeyDeserializer = offsetKeyDeserializer; - } else { - this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); - this.offsetKeyDeserializer.configure(config.originals(), true); - } - - if (offsetValueDeserializer != null) { - this.offsetValueDeserializer = offsetValueDeserializer; - } else { - this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class); - this.offsetValueDeserializer.configure(config.originals(), false); - } } public void start() { @@ -119,8 +91,8 @@ public class Worker { Map producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName()); - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); for (String propName : unusedConfigs.stringPropertyNames()) { producerProps.put(propName, unusedConfigs.getProperty(propName)); } @@ -188,14 +160,14 @@ public class Worker { final WorkerTask workerTask; if (task instanceof SourceTask) { SourceTask sourceTask = (SourceTask) task; - OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.connector(), - keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer); - OffsetStorageWriter offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.connector(), - keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer); - workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer, + OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), + offsetKeyConverter, offsetValueConverter); + OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), + offsetKeyConverter, offsetValueConverter); + workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer, offsetReader, offsetWriter, config, time); } else if (task instanceof SinkTask) { - workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, keyConverter, valueConverter, time); + workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time); } else { log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask"); diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java index dfb1f963325..7e71fb87c5d 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java @@ -38,21 +38,21 @@ import java.util.concurrent.TimeUnit; /** * WorkerTask that uses a SinkTask to export data from Kafka. */ -class WorkerSinkTask implements WorkerTask { +class WorkerSinkTask implements WorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class); private final ConnectorTaskId id; private final SinkTask task; private final WorkerConfig workerConfig; private final Time time; - private final Converter keyConverter; - private final Converter valueConverter; + private final Converter keyConverter; + private final Converter valueConverter; private WorkerSinkTaskThread workThread; - private KafkaConsumer consumer; + private KafkaConsumer consumer; private final SinkTaskContext context; public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig, - Converter keyConverter, Converter valueConverter, Time time) { + Converter keyConverter, Converter valueConverter, Time time) { this.id = id; this.task = task; this.workerConfig = workerConfig; @@ -107,7 +107,7 @@ class WorkerSinkTask implements WorkerTask { public void poll(long timeoutMs) { try { log.trace("{} polling consumer with timeout {} ms", id, timeoutMs); - ConsumerRecords msgs = consumer.poll(timeoutMs); + ConsumerRecords msgs = consumer.poll(timeoutMs); log.trace("{} polling returned {} messages", id, msgs.count()); deliverMessages(msgs); } catch (ConsumerWakeupException we) { @@ -154,7 +154,7 @@ class WorkerSinkTask implements WorkerTask { return workerConfig; } - private KafkaConsumer createConsumer(Properties taskProps) { + private KafkaConsumer createConsumer(Properties taskProps) { String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG); if (topicsStr == null || topicsStr.isEmpty()) throw new CopycatException("Sink tasks require a list of topics."); @@ -168,12 +168,10 @@ class WorkerSinkTask implements WorkerTask { Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - workerConfig.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName()); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - KafkaConsumer newConsumer; + KafkaConsumer newConsumer; try { newConsumer = new KafkaConsumer<>(props); } catch (Throwable t) { @@ -202,14 +200,14 @@ class WorkerSinkTask implements WorkerTask { return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig); } - private void deliverMessages(ConsumerRecords msgs) { + private void deliverMessages(ConsumerRecords msgs) { // Finally, deliver this batch to the sink if (msgs.count() > 0) { List records = new ArrayList<>(); - for (ConsumerRecord msg : msgs) { + for (ConsumerRecord msg : msgs) { log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); - SchemaAndValue keyAndSchema = msg.key() != null ? keyConverter.toCopycatData(msg.key()) : SchemaAndValue.NULL; - SchemaAndValue valueAndSchema = msg.value() != null ? valueConverter.toCopycatData(msg.value()) : SchemaAndValue.NULL; + SchemaAndValue keyAndSchema = keyConverter.toCopycatData(msg.topic(), msg.key()); + SchemaAndValue valueAndSchema = valueConverter.toCopycatData(msg.topic(), msg.value()); records.add( new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java index 14b9c3ab406..ee0a532fb30 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java @@ -23,7 +23,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.copycat.cli.WorkerConfig; -import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceTask; import org.apache.kafka.copycat.source.SourceTaskContext; @@ -46,33 +45,31 @@ import java.util.concurrent.TimeoutException; /** * WorkerTask that uses a SourceTask to ingest data into Kafka. */ -class WorkerSourceTask implements WorkerTask { +class WorkerSourceTask implements WorkerTask { private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class); private ConnectorTaskId id; private SourceTask task; - private final Converter keyConverter; - private final Converter valueConverter; - private KafkaProducer producer; + private final Converter keyConverter; + private final Converter valueConverter; + private KafkaProducer producer; private WorkerSourceTaskThread workThread; private OffsetStorageReader offsetReader; - private OffsetStorageWriter offsetWriter; + private OffsetStorageWriter offsetWriter; private final WorkerConfig workerConfig; private final Time time; // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because // there is no IdentityHashSet. - private IdentityHashMap, ProducerRecord> - outstandingMessages; + private IdentityHashMap, ProducerRecord> outstandingMessages; // A second buffer is used while an offset flush is running - private IdentityHashMap, ProducerRecord> - outstandingMessagesBacklog; + private IdentityHashMap, ProducerRecord> outstandingMessagesBacklog; private boolean flushing; public WorkerSourceTask(ConnectorTaskId id, SourceTask task, - Converter keyConverter, Converter valueConverter, - KafkaProducer producer, - OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, + Converter keyConverter, Converter valueConverter, + KafkaProducer producer, + OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter, WorkerConfig workerConfig, Time time) { this.id = id; this.task = task; @@ -132,10 +129,9 @@ class WorkerSourceTask implements WorkerTask { */ private synchronized void sendRecords(List records) { for (SourceRecord record : records) { - K key = (record.keySchema() != null) ? keyConverter.fromCopycatData(record.keySchema(), record.key()) : null; - V value = (record.valueSchema() != null) ? valueConverter.fromCopycatData(record.valueSchema(), record.value()) : null; - final ProducerRecord producerRecord = new ProducerRecord<>( - record.topic(), record.kafkaPartition(), key, value); + byte[] key = keyConverter.fromCopycatData(record.topic(), record.keySchema(), record.key()); + byte[] value = valueConverter.fromCopycatData(record.topic(), record.valueSchema(), record.value()); + final ProducerRecord producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value); log.trace("Appending record with key {}, value {}", record.key(), record.value()); if (!flushing) { outstandingMessages.put(producerRecord, producerRecord); @@ -158,13 +154,12 @@ class WorkerSourceTask implements WorkerTask { } }); // Offsets are converted & serialized in the OffsetWriter - offsetWriter.offset(new SchemaAndValue(record.sourcePartitionSchema(), record.sourcePartition()), - new SchemaAndValue(record.sourceOffsetSchema(), record.sourceOffset())); + offsetWriter.offset(record.sourcePartition(), record.sourceOffset()); } } - private synchronized void recordSent(final ProducerRecord record) { - ProducerRecord removed = outstandingMessages.remove(record); + private synchronized void recordSent(final ProducerRecord record) { + ProducerRecord removed = outstandingMessages.remove(record); // While flushing, we may also see callbacks for items in the backlog if (removed == null && flushing) removed = outstandingMessagesBacklog.remove(record); @@ -276,7 +271,7 @@ class WorkerSourceTask implements WorkerTask { private void finishSuccessfulFlush() { // If we were successful, we can just swap instead of replacing items back into the original map - IdentityHashMap, ProducerRecord> temp = outstandingMessages; + IdentityHashMap, ProducerRecord> temp = outstandingMessages; outstandingMessages = outstandingMessagesBacklog; outstandingMessagesBacklog = temp; flushing = false; diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java index 237eda63113..7521955abc0 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java @@ -17,8 +17,6 @@ package org.apache.kafka.copycat.storage; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.errors.CopycatException; import org.slf4j.Logger; @@ -35,39 +33,36 @@ import java.util.Map; * directly, the interface is only separate from this implementation because it needs to be * included in the public API package. */ -public class OffsetStorageReaderImpl implements OffsetStorageReader { +public class OffsetStorageReaderImpl implements OffsetStorageReader { private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class); private final OffsetBackingStore backingStore; private final String namespace; - private final Converter keyConverter; - private final Converter valueConverter; - private final Serializer keySerializer; - private final Deserializer valueDeserializer; + private final Converter keyConverter; + private final Converter valueConverter; public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace, - Converter keyConverter, Converter valueConverter, - Serializer keySerializer, Deserializer valueDeserializer) { + Converter keyConverter, Converter valueConverter) { this.backingStore = backingStore; this.namespace = namespace; this.keyConverter = keyConverter; this.valueConverter = valueConverter; - this.keySerializer = keySerializer; - this.valueDeserializer = valueDeserializer; } @Override - public SchemaAndValue offset(SchemaAndValue partition) { + public Map offset(Map partition) { return offsets(Arrays.asList(partition)).get(partition); } @Override - public Map offsets(Collection partitions) { + public Map, Map> offsets(Collection> partitions) { // Serialize keys so backing store can work with them - Map serializedToOriginal = new HashMap<>(partitions.size()); - for (SchemaAndValue key : partitions) { + Map> serializedToOriginal = new HashMap<>(partitions.size()); + for (Map key : partitions) { try { - byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key.schema(), key.value())); + // Offsets are treated as schemaless, their format is only validated here (and the returned value below) + OffsetUtils.validateFormat(key); + byte[] keySerialized = keyConverter.fromCopycatData(namespace, null, key); ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null; serializedToOriginal.put(keyBuffer, key); } catch (Throwable t) { @@ -87,7 +82,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader { } // Deserialize all the values and map back to the original keys - Map result = new HashMap<>(partitions.size()); + Map, Map> result = new HashMap<>(partitions.size()); for (Map.Entry rawEntry : raw.entrySet()) { try { // Since null could be a valid key, explicitly check whether map contains the key @@ -96,12 +91,12 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader { + "store may have returned invalid data", rawEntry.getKey()); continue; } - SchemaAndValue origKey = serializedToOriginal.get(rawEntry.getKey()); - SchemaAndValue deserializedValue = valueConverter.toCopycatData( - valueDeserializer.deserialize(namespace, rawEntry.getValue().array()) - ); + Map origKey = serializedToOriginal.get(rawEntry.getKey()); + SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue().array()); + Object deserializedValue = deserializedSchemaAndValue.value(); + OffsetUtils.validateFormat(deserializedValue); - result.put(origKey, deserializedValue); + result.put(origKey, (Map) deserializedValue); } catch (Throwable t) { log.error("CRITICAL: Failed to deserialize offset data when getting offsets for task with" + " namespace {}. No value for this data will be returned, which may break the " diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java index 4fb75e7400f..be8c7187308 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java @@ -17,16 +17,13 @@ package org.apache.kafka.copycat.storage; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.util.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import java.util.concurrent.Future; /** @@ -64,32 +61,27 @@ import java.util.concurrent.Future; * This class is not thread-safe. It should only be accessed from a Task's processing thread. *

*/ -public class OffsetStorageWriter { +public class OffsetStorageWriter { private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class); private final OffsetBackingStore backingStore; - private final Converter keyConverter; - private final Converter valueConverter; - private final Serializer keySerializer; - private final Serializer valueSerializer; + private final Converter keyConverter; + private final Converter valueConverter; private final String namespace; // Offset data in Copycat format - private Map data = new HashMap<>(); + private Map, Map> data = new HashMap<>(); // Not synchronized, should only be accessed by flush thread - private Map toFlush = null; + private Map, Map> toFlush = null; // Unique ID for each flush request to handle callbacks after timeouts private long currentFlushId = 0; public OffsetStorageWriter(OffsetBackingStore backingStore, - String namespace, Converter keyConverter, Converter valueConverter, - Serializer keySerializer, Serializer valueSerializer) { + String namespace, Converter keyConverter, Converter valueConverter) { this.backingStore = backingStore; this.namespace = namespace; this.keyConverter = keyConverter; this.valueConverter = valueConverter; - this.keySerializer = keySerializer; - this.valueSerializer = valueSerializer; } /** @@ -97,8 +89,8 @@ public class OffsetStorageWriter { * @param partition the partition to store an offset for * @param offset the offset */ - public synchronized void offset(SchemaAndValue partition, SchemaAndValue offset) { - data.put(partition, offset); + public synchronized void offset(Map partition, Map offset) { + data.put((Map) partition, (Map) offset); } private boolean flushing() { @@ -142,10 +134,14 @@ public class OffsetStorageWriter { Map offsetsSerialized; try { offsetsSerialized = new HashMap<>(); - for (Map.Entry entry : toFlush.entrySet()) { - byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey().schema(), entry.getKey().value())); + for (Map.Entry, Map> entry : toFlush.entrySet()) { + // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate + // for that data. The only enforcement of the format is here. + OffsetUtils.validateFormat(entry.getKey()); + OffsetUtils.validateFormat(entry.getValue()); + byte[] key = keyConverter.fromCopycatData(namespace, null, entry.getKey()); ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null; - byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue().schema(), entry.getValue().value())); + byte[] value = valueConverter.fromCopycatData(namespace, null, entry.getValue()); ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null; offsetsSerialized.put(keyBuffer, valueBuffer); } @@ -155,6 +151,7 @@ public class OffsetStorageWriter { log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit " + "offsets under namespace {}. This likely won't recover unless the " + "unserializable partition or offset information is overwritten.", namespace); + log.error("Cause of serialization failure:", t); callback.onCompletion(t, null); return null; } diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java new file mode 100644 index 00000000000..bd3a87b5b23 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.copycat.data.CopycatSchema; +import org.apache.kafka.copycat.data.Schema; +import org.apache.kafka.copycat.errors.DataException; + +import java.util.Map; + +public class OffsetUtils { + public static void validateFormat(Object offsetData) { + if (!(offsetData instanceof Map)) + throw new DataException("Offsets must be specified as a Map"); + validateFormat((Map) offsetData); + } + + public static void validateFormat(Map offsetData) { + for (Map.Entry entry : offsetData.entrySet()) { + if (!(entry.getKey() instanceof String)) + throw new DataException("Offsets may only use String keys"); + + Object value = entry.getValue(); + if (value == null) + continue; + Schema.Type schemaType = CopycatSchema.schemaType(value.getClass()); + if (!schemaType.isPrimitive()) + throw new DataException("Offsets may only contain primitive types as values, but field " + entry.getKey() + " contains " + schemaType); + } + } +} diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java index 54e9bc6cc40..542ed768304 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java @@ -67,10 +67,10 @@ public class WorkerSinkTaskTest extends ThreadedTest { private Time time; @Mock private SinkTask sinkTask; private WorkerConfig workerConfig; - @Mock private Converter keyConverter; + @Mock private Converter keyConverter; @Mock - private Converter valueConverter; - private WorkerSinkTask workerTask; + private Converter valueConverter; + private WorkerSinkTask workerTask; @Mock private KafkaConsumer consumer; private WorkerSinkTaskThread workerThread; @@ -84,10 +84,10 @@ public class WorkerSinkTaskTest extends ThreadedTest { Properties workerProps = new Properties(); workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); - workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); - workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); - workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); + workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("offset.key.converter.schemas.enable", "false"); + workerProps.setProperty("offset.value.converter.schemas.enable", "false"); workerConfig = new WorkerConfig(workerProps); workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"}, @@ -138,12 +138,12 @@ public class WorkerSinkTaskTest extends ThreadedTest { ConsumerRecords records = new ConsumerRecords<>( Collections.singletonMap( - new TopicPartition("topic", 0), - Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE)))); + new TopicPartition(TOPIC, 0), + Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, RAW_KEY, RAW_VALUE)))); // Exact data doesn't matter, but should be passed directly to sink task - EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record); - EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record); + EasyMock.expect(keyConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_KEY))).andReturn(record); + EasyMock.expect(valueConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_VALUE))).andReturn(record); Capture> capturedRecords = EasyMock.newCapture(CaptureType.ALL); sinkTask.put(EasyMock.capture(capturedRecords)); @@ -320,8 +320,8 @@ public class WorkerSinkTaskTest extends ThreadedTest { return records; } }); - EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes(); - EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes(); + EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes(); + EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes(); Capture> capturedRecords = EasyMock.newCapture(CaptureType.ALL); sinkTask.put(EasyMock.capture(capturedRecords)); EasyMock.expectLastCall().anyTimes(); diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java index 018aa942b40..3ff3a6237a2 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.copycat.cli.WorkerConfig; import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.source.SourceRecord; import org.apache.kafka.copycat.source.SourceTask; import org.apache.kafka.copycat.source.SourceTaskContext; @@ -45,11 +44,7 @@ import org.powermock.api.easymock.annotation.Mock; import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -59,37 +54,36 @@ import static org.junit.Assert.*; @RunWith(PowerMockRunner.class) public class WorkerSourceTaskTest extends ThreadedTest { - private static final Schema PARTITION_SCHEMA = Schema.BYTES_SCHEMA; - private static final byte[] PARTITION_BYTES = "partition".getBytes(); - private static final Schema OFFSET_SCHEMA = Schema.BYTES_SCHEMA; - private static final byte[] OFFSET_BYTES = "offset-1".getBytes(); + private static final String TOPIC = "topic"; + private static final Map PARTITION = Collections.singletonMap("key", "partition".getBytes()); + private static final Map OFFSET = Collections.singletonMap("key", 12); // Copycat-format data private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; private static final Integer KEY = -1; private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA; private static final Long RECORD = 12L; - // Native-formatted data. The actual format of this data doesn't matter -- we just want to see that the right version + // Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version // is used in the right place. - private static final ByteBuffer CONVERTED_KEY = ByteBuffer.wrap("converted-key".getBytes()); - private static final String CONVERTED_RECORD = "converted-record"; + private static final byte[] SERIALIZED_KEY = "converted-key".getBytes(); + private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes(); private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private WorkerConfig config; @Mock private SourceTask sourceTask; - @Mock private Converter keyConverter; - @Mock private Converter valueConverter; - @Mock private KafkaProducer producer; + @Mock private Converter keyConverter; + @Mock private Converter valueConverter; + @Mock private KafkaProducer producer; @Mock private OffsetStorageReader offsetReader; - @Mock private OffsetStorageWriter offsetWriter; - private WorkerSourceTask workerTask; + @Mock private OffsetStorageWriter offsetWriter; + private WorkerSourceTask workerTask; @Mock private Future sendFuture; private Capture producerCallbacks; private static final Properties EMPTY_TASK_PROPS = new Properties(); private static final List RECORDS = Arrays.asList( - new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) + new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) ); @Override @@ -98,16 +92,16 @@ public class WorkerSourceTaskTest extends ThreadedTest { Properties workerProps = new Properties(); workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); - workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); - workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); - workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); + workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("offset.key.converter.schemas.enable", "false"); + workerProps.setProperty("offset.value.converter.schemas.enable", "false"); config = new WorkerConfig(workerProps); producerCallbacks = EasyMock.newCapture(); } private void createWorkerTask() { - workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, valueConverter, producer, + workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer, offsetReader, offsetWriter, config, new SystemTime()); } @@ -201,15 +195,15 @@ public class WorkerSourceTaskTest extends ThreadedTest { List records = new ArrayList<>(); // Can just use the same record for key and value - records.add(new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)); + records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)); - Capture> sent = expectSendRecord(); + Capture> sent = expectSendRecord(); PowerMock.replayAll(); Whitebox.invokeMethod(workerTask, "sendRecords", records); - assertEquals(CONVERTED_KEY, sent.getValue().key()); - assertEquals(CONVERTED_RECORD, sent.getValue().value()); + assertEquals(SERIALIZED_KEY, sent.getValue().key()); + assertEquals(SERIALIZED_RECORD, sent.getValue().value()); PowerMock.verifyAll(); } @@ -233,11 +227,11 @@ public class WorkerSourceTaskTest extends ThreadedTest { return latch; } - private Capture> expectSendRecord() throws InterruptedException { - EasyMock.expect(keyConverter.fromCopycatData(KEY_SCHEMA, KEY)).andStubReturn(CONVERTED_KEY); - EasyMock.expect(valueConverter.fromCopycatData(RECORD_SCHEMA, RECORD)).andStubReturn(CONVERTED_RECORD); + private Capture> expectSendRecord() throws InterruptedException { + EasyMock.expect(keyConverter.fromCopycatData(TOPIC, KEY_SCHEMA, KEY)).andStubReturn(SERIALIZED_KEY); + EasyMock.expect(valueConverter.fromCopycatData(TOPIC, RECORD_SCHEMA, RECORD)).andStubReturn(SERIALIZED_RECORD); - Capture> sent = EasyMock.newCapture(); + Capture> sent = EasyMock.newCapture(); // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work EasyMock.expect( producer.send(EasyMock.capture(sent), @@ -255,7 +249,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { } }); // 2. Offset data is passed to the offset storage. - offsetWriter.offset(new SchemaAndValue(PARTITION_SCHEMA, PARTITION_BYTES), new SchemaAndValue(OFFSET_SCHEMA, OFFSET_BYTES)); + offsetWriter.offset(PARTITION, OFFSET); PowerMock.expectLastCall().anyTimes(); return sent; diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java index 32e7ff903fe..701e23023f2 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java @@ -17,8 +17,6 @@ package org.apache.kafka.copycat.runtime; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.copycat.cli.WorkerConfig; @@ -49,10 +47,6 @@ public class WorkerTest extends ThreadedTest { private ConnectorTaskId taskId = new ConnectorTaskId("job", 0); private Worker worker; private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class); - private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class); - private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class); - private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class); - private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class); @Before public void setup() { @@ -61,14 +55,12 @@ public class WorkerTest extends ThreadedTest { Properties workerProps = new Properties(); workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter"); workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter"); - workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); - workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer"); - workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); - workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer"); + workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter"); + workerProps.setProperty("offset.key.converter.schemas.enable", "false"); + workerProps.setProperty("offset.value.converter.schemas.enable", "false"); WorkerConfig config = new WorkerConfig(workerProps); - worker = new Worker(new MockTime(), config, offsetBackingStore, - offsetKeySerializer, offsetValueSerializer, - offsetKeyDeserializer, offsetValueDeserializer); + worker = new Worker(new MockTime(), config, offsetBackingStore); worker.start(); } diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java index 9c0c52da978..956d0644728 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java @@ -17,10 +17,6 @@ package org.apache.kafka.copycat.storage; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaAndValue; -import org.apache.kafka.copycat.data.SchemaBuilder; import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.util.Callback; import org.easymock.Capture; @@ -35,9 +31,7 @@ import org.powermock.api.easymock.annotation.Mock; import org.powermock.modules.junit4.PowerMockRunner; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.*; @@ -48,13 +42,9 @@ import static org.junit.Assert.assertTrue; public class OffsetStorageWriterTest { private static final String NAMESPACE = "namespace"; // Copycat format - any types should be accepted here - private static final Schema OFFSET_KEY_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).build(); - private static final List OFFSET_KEY = Arrays.asList("key", "key"); - private static final Schema OFFSET_VALUE_SCHEMA = Schema.STRING_SCHEMA; - private static final String OFFSET_VALUE = "value"; - // Native objects - must match serializer types - private static final int OFFSET_KEY_CONVERTED = 12; - private static final String OFFSET_VALUE_CONVERTED = "value-converted"; + private static final Map OFFSET_KEY = Collections.singletonMap("key", "key"); + private static final Map OFFSET_VALUE = Collections.singletonMap("key", 12); + // Serialized private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); @@ -63,11 +53,9 @@ public class OffsetStorageWriterTest { ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED)); @Mock private OffsetBackingStore store; - @Mock private Converter keyConverter; - @Mock private Converter valueConverter; - @Mock private Serializer keySerializer; - @Mock private Serializer valueSerializer; - private OffsetStorageWriter writer; + @Mock private Converter keyConverter; + @Mock private Converter valueConverter; + private OffsetStorageWriter writer; private static Exception exception = new RuntimeException("error"); @@ -75,7 +63,7 @@ public class OffsetStorageWriterTest { @Before public void setup() { - writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer); + writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter); service = Executors.newFixedThreadPool(1); } @@ -92,7 +80,7 @@ public class OffsetStorageWriterTest { PowerMock.replayAll(); - writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE)); + writer.offset(OFFSET_KEY, OFFSET_VALUE); assertTrue(writer.beginFlush()); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); @@ -128,7 +116,7 @@ public class OffsetStorageWriterTest { PowerMock.replayAll(); - writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE)); + writer.offset(OFFSET_KEY, OFFSET_VALUE); assertTrue(writer.beginFlush()); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); assertTrue(writer.beginFlush()); @@ -148,7 +136,7 @@ public class OffsetStorageWriterTest { PowerMock.replayAll(); - writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE)); + writer.offset(OFFSET_KEY, OFFSET_VALUE); assertTrue(writer.beginFlush()); writer.doFlush(callback); assertTrue(writer.beginFlush()); // should throw @@ -160,7 +148,7 @@ public class OffsetStorageWriterTest { public void testCancelBeforeAwaitFlush() { PowerMock.replayAll(); - writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE)); + writer.offset(OFFSET_KEY, OFFSET_VALUE); assertTrue(writer.beginFlush()); writer.cancelFlush(); @@ -178,7 +166,7 @@ public class OffsetStorageWriterTest { PowerMock.replayAll(); - writer.offset(new SchemaAndValue(OFFSET_KEY_SCHEMA, OFFSET_KEY), new SchemaAndValue(OFFSET_VALUE_SCHEMA, OFFSET_VALUE)); + writer.offset(OFFSET_KEY, OFFSET_VALUE); assertTrue(writer.beginFlush()); // Start the flush, then immediately cancel before allowing the mocked store request to finish Future flushFuture = writer.doFlush(callback); @@ -207,10 +195,8 @@ public class OffsetStorageWriterTest { private void expectStore(final Callback callback, final boolean fail, final CountDownLatch waitForCompletion) { - EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY_SCHEMA, OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED); - EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED); - EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE_SCHEMA, OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED); - EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED); + EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, OFFSET_KEY)).andReturn(OFFSET_KEY_SERIALIZED); + EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, OFFSET_VALUE)).andReturn(OFFSET_VALUE_SERIALIZED); final Capture> storeCallback = Capture.newInstance(); EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED), diff --git a/tests/kafkatest/tests/copycat_test.py b/tests/kafkatest/tests/copycat_test.py index 344f7ef66cb..b4adf53face 100644 --- a/tests/kafkatest/tests/copycat_test.py +++ b/tests/kafkatest/tests/copycat_test.py @@ -15,8 +15,10 @@ from kafkatest.tests.kafka_test import KafkaTest from kafkatest.services.copycat import CopycatStandaloneService +from kafkatest.services.console_consumer import ConsoleConsumer from ducktape.utils.util import wait_until -import hashlib, subprocess +from ducktape.mark import parametrize +import hashlib, subprocess, json class CopycatStandaloneFileTest(KafkaTest): """ @@ -30,8 +32,14 @@ class CopycatStandaloneFileTest(KafkaTest): OFFSETS_FILE = "/mnt/copycat.offsets" - FIRST_INPUT = "foo\nbar\nbaz\n" - SECOND_INPUT = "razz\nma\ntazz\n" + TOPIC = "test" + + FIRST_INPUT_LIST = ["foo", "bar", "baz"] + FIRST_INPUT = "\n".join(FIRST_INPUT_LIST) + "\n" + SECOND_INPUT_LIST = ["razz", "ma", "tazz"] + SECOND_INPUT = "\n".join(SECOND_INPUT_LIST) + "\n" + + SCHEMA = { "type": "string", "optional": False } def __init__(self, test_context): super(CopycatStandaloneFileTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={ @@ -40,8 +48,18 @@ class CopycatStandaloneFileTest(KafkaTest): self.source = CopycatStandaloneService(test_context, self.kafka, [self.INPUT_FILE, self.OFFSETS_FILE]) self.sink = CopycatStandaloneService(test_context, self.kafka, [self.OUTPUT_FILE, self.OFFSETS_FILE]) + self.consumer_validator = ConsoleConsumer(test_context, 1, self.kafka, self.TOPIC, consumer_timeout_ms=1000) + + @parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True) + @parametrize(converter="org.apache.kafka.copycat.json.JsonConverter", schemas=False) + @parametrize(converter="org.apache.kafka.copycat.storage.StringConverter", schemas=None) + def test_file_source_and_sink(self, converter="org.apache.kafka.json.JsonConverter", schemas=True): + assert converter != None, "converter type must be set" + # Template parameters + self.key_converter = converter + self.value_converter = converter + self.schemas = schemas - def test_file_source_and_sink(self): # These need to be set self.source.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-source.properties")) self.sink.set_configs(self.render("copycat-standalone.properties"), self.render("copycat-file-sink.properties")) @@ -61,6 +79,13 @@ class CopycatStandaloneFileTest(KafkaTest): self.source.node.account.ssh("echo -e -n " + repr(self.SECOND_INPUT) + " >> " + self.INPUT_FILE) wait_until(lambda: self.validate_output(self.FIRST_INPUT + self.SECOND_INPUT), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file") + # Validate the format of the data in the Kafka topic + self.consumer_validator.run() + expected = json.dumps([line if not self.schemas else { "schema": self.SCHEMA, "payload": line } for line in self.FIRST_INPUT_LIST + self.SECOND_INPUT_LIST]) + decoder = (json.loads if converter.endswith("JsonConverter") else str) + actual = json.dumps([decoder(x) for x in self.consumer_validator.messages_consumed[1]]) + assert expected == actual, "Expected %s but saw %s in Kafka" % (expected, actual) + def validate_output(self, value): try: output_hash = list(self.sink.node.account.ssh_capture("md5sum " + self.OUTPUT_FILE))[0].strip().split()[0] diff --git a/tests/kafkatest/tests/templates/copycat-file-sink.properties b/tests/kafkatest/tests/templates/copycat-file-sink.properties index c7865a6042c..77c43c7ffe2 100644 --- a/tests/kafkatest/tests/templates/copycat-file-sink.properties +++ b/tests/kafkatest/tests/templates/copycat-file-sink.properties @@ -17,4 +17,4 @@ name=local-file-sink connector.class=org.apache.kafka.copycat.file.FileStreamSinkConnector tasks.max=1 file={{ OUTPUT_FILE }} -topics=test \ No newline at end of file +topics={{ TOPIC }} \ No newline at end of file diff --git a/tests/kafkatest/tests/templates/copycat-file-source.properties b/tests/kafkatest/tests/templates/copycat-file-source.properties index 8612ed70de5..68dabc21dca 100644 --- a/tests/kafkatest/tests/templates/copycat-file-source.properties +++ b/tests/kafkatest/tests/templates/copycat-file-source.properties @@ -17,4 +17,4 @@ name=local-file-source connector.class=org.apache.kafka.copycat.file.FileStreamSourceConnector tasks.max=1 file={{ INPUT_FILE }} -topic=test \ No newline at end of file +topic={{ TOPIC }} \ No newline at end of file diff --git a/tests/kafkatest/tests/templates/copycat-standalone.properties b/tests/kafkatest/tests/templates/copycat-standalone.properties index 5ffb487d70f..39db6cec07e 100644 --- a/tests/kafkatest/tests/templates/copycat-standalone.properties +++ b/tests/kafkatest/tests/templates/copycat-standalone.properties @@ -15,11 +15,18 @@ bootstrap.servers={{ kafka.bootstrap_servers() }} -key.converter=org.apache.kafka.copycat.json.JsonConverter -value.converter=org.apache.kafka.copycat.json.JsonConverter -key.serializer=org.apache.kafka.copycat.json.JsonSerializer -value.serializer=org.apache.kafka.copycat.json.JsonSerializer -key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer -value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer +key.converter={{ key_converter|default("org.apache.kafka.copycat.json.JsonConverter") }} +value.converter={{ value_converter|default("org.apache.kafka.copycat.json.JsonConverter") }} +{% if key_converter is not defined or key_converter.endswith("JsonConverter") %} +key.converter.schemas.enable={{ schemas|default(True)|string|lower }} +{% endif %} +{% if value_converter is not defined or value_converter.endswith("JsonConverter") %} +value.converter.schemas.enable={{ schemas|default(True)|string|lower }} +{% endif %} + +offset.key.converter=org.apache.kafka.copycat.json.JsonConverter +offset.value.converter=org.apache.kafka.copycat.json.JsonConverter +offset.key.converter.schemas.enable=false +offset.value.converter.schemas.enable=false offset.storage.file.filename={{ OFFSETS_FILE }}