Replace Avro serializer with JSON serializer.

This commit is contained in:
Ewen Cheslack-Postava 2015-07-29 22:35:05 -07:00
parent 1243a7c3cb
commit 220e42d8ae
22 changed files with 885 additions and 980 deletions

View File

@ -75,7 +75,7 @@ do
CLASSPATH=$CLASSPATH:$file
done
for pkg in "copycat-api" "copycat-avro" "copycat-data" "copycat-file" "copycat-runtime"
for pkg in "copycat-data" "copycat-api" "copycat-runtime" "copycat-file" "copycat-json"
do
for file in $base_dir/${pkg}/build/libs/${pkg}*.jar $base_dir/${pkg}/build/dependant-libs/*.jar;
do

View File

@ -33,7 +33,6 @@ allprojects {
apply plugin: 'idea'
repositories {
mavenCentral()
mavenLocal() // FIXME Currently required to get io.confluent:kafka-avro-serializer:2.0-SNAPSHOT
}
}
@ -205,7 +204,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
}
}
def copycatPkgs = ['copycat-data', 'copycat-api', 'copycat-runtime', 'copycat-avro', 'copycat-file']
def copycatPkgs = ['copycat-data', 'copycat-api', 'copycat-runtime', 'copycat-json', 'copycat-file']
def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools'] + copycatPkgs
tasks.create(name: "jarCopycat", dependsOn: copycatPkgs.collect { it + ":jar" }) {}
@ -588,14 +587,14 @@ project(':copycat-api') {
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
project(':copycat-avro') {
project(':copycat-json') {
apply plugin: 'checkstyle'
archivesBaseName = "copycat-avro"
archivesBaseName = "copycat-json"
dependencies {
compile project(':copycat-api')
compile "org.slf4j:slf4j-api:1.7.6"
compile "io.confluent:kafka-avro-serializer:2.0-SNAPSHOT"
compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
testCompile 'junit:junit:4.6'
testCompile 'org.easymock:easymock:3.3.1'
@ -652,7 +651,6 @@ project(':copycat-runtime') {
dependencies {
compile project(':copycat-api')
compile project(':copycat-avro')
compile project(':clients')
compile "org.slf4j:slf4j-api:1.7.6"
@ -661,6 +659,7 @@ project(':copycat-runtime') {
testCompile 'org.powermock:powermock-module-junit4:1.6.2'
testCompile 'org.powermock:powermock-api-easymock:1.6.2'
testRuntime "$slf4jlog4j"
testRuntime project(":copycat-json")
}
task testJar(type: Jar) {

View File

@ -122,14 +122,6 @@
<allow pkg="org.apache.kafka.copycat.storage" />
</subpackage>
<subpackage name="avro">
<allow pkg="org.apache.avro" />
<allow pkg="org.apache.kafka.copycat" />
<!-- FIXME/move Avro plugin out of Kafka -->
<allow pkg="io.confluent.kafka.serializers" />
<allow pkg="io.confluent.kafka.schemaregistry" />
</subpackage>
<subpackage name="runtime">
<allow pkg="org.apache.kafka.copycat" />
<allow pkg="org.apache.kafka.common" />
@ -157,6 +149,13 @@
<allow pkg="org.apache.kafka.copycat" />
</subpackage>
<subpackage name="json">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.common.serialization" />
<allow pkg="org.apache.kafka.common.errors" />
<allow pkg="org.apache.kafka.copycat.storage" />
</subpackage>
<subpackage name="file">
<allow pkg="org.apache.kafka.copycat" />
<!-- for tests -->

View File

@ -14,14 +14,12 @@
# limitations under the License.
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081
# TODO: Non-avro built-ins?
converter=org.apache.kafka.copycat.avro.AvroConverter
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
converter=org.apache.kafka.copycat.json.JsonConverter
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
offset.storage.class=org.apache.kafka.copycat.storage.FileOffsetBackingStore
offset.storage.file.filename=/tmp/copycat.offsets

View File

@ -15,11 +15,9 @@
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081
# TODO: Non-avro built-ins?
converter=org.apache.kafka.copycat.avro.AvroConverter
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
converter=org.apache.kafka.copycat.json.JsonConverter
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer

View File

@ -1,35 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.avro;
import org.apache.kafka.copycat.storage.Converter;
/**
* Implementation of Converter that uses Avro schemas and objects.
*/
public class AvroConverter implements Converter {
@Override
public Object fromCopycatData(Object value) {
return AvroData.convertToAvro(value);
}
@Override
public Object toCopycatData(Object value) {
return AvroData.convertFromAvro(value);
}
}

View File

@ -1,350 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.avro;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.*;
import org.apache.kafka.copycat.data.GenericRecord;
import org.apache.kafka.copycat.data.GenericRecordBuilder;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
/**
* Utilities for converting between our runtime data format and Avro, and (de)serializing that data.
*/
public class AvroData {
private static final Map<String, org.apache.avro.Schema> PRIMITIVE_SCHEMAS;
static {
org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
PRIMITIVE_SCHEMAS = new HashMap<String, org.apache.avro.Schema>();
PRIMITIVE_SCHEMAS.put("Null", createPrimitiveSchema(parser, "null"));
PRIMITIVE_SCHEMAS.put("Boolean", createPrimitiveSchema(parser, "boolean"));
PRIMITIVE_SCHEMAS.put("Integer", createPrimitiveSchema(parser, "int"));
PRIMITIVE_SCHEMAS.put("Long", createPrimitiveSchema(parser, "long"));
PRIMITIVE_SCHEMAS.put("Float", createPrimitiveSchema(parser, "float"));
PRIMITIVE_SCHEMAS.put("Double", createPrimitiveSchema(parser, "double"));
PRIMITIVE_SCHEMAS.put("String", createPrimitiveSchema(parser, "string"));
PRIMITIVE_SCHEMAS.put("Bytes", createPrimitiveSchema(parser, "bytes"));
}
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
/**
* Convert this object, in the org.apache.kafka.copycat.data format, into an Avro object.
*/
public static Object convertToAvro(Object value) {
if (value == null || value instanceof Byte || value instanceof Short ||
value instanceof Integer || value instanceof Long ||
value instanceof Float || value instanceof Double ||
value instanceof Boolean ||
value instanceof byte[] || value instanceof ByteBuffer ||
value instanceof CharSequence) {
// Note that using CharSequence allows Utf8 -- either copycat or Avro variants -- to pass
// through, but this should be fine as long as they are always handled as CharSequences
return value;
} else if (value instanceof GenericRecord) {
GenericRecord recordValue = (GenericRecord) value;
org.apache.avro.Schema avroSchema = asAvroSchema(recordValue.getSchema());
org.apache.avro.generic.GenericRecordBuilder builder
= new org.apache.avro.generic.GenericRecordBuilder(avroSchema);
for (Schema.Field field : recordValue.getSchema().getFields()) {
builder.set(field.name(), convertToAvro(recordValue.get(field.name())));
}
return builder.build();
} else if (value instanceof Collection) {
Collection collection = (Collection) value;
List<Object> converted = new ArrayList<Object>(collection.size());
for (Object elem : collection) {
converted.add(convertToAvro(elem));
}
return converted;
} else if (value instanceof Map) {
Map<String, Object> map = (Map<String, Object>) value;
Map<String, Object> converted = new TreeMap<String, Object>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
converted.put(entry.getKey(), convertToAvro(entry.getValue()));
}
return converted;
}
// Fixed and enum are handled by byte[] and String
throw new RuntimeException("Couldn't convert " + value + " to Avro.");
}
/**
* Serialize this object, in org.apache.kafka.copycat.data format, to Avro's binary encoding
*/
public static ByteBuffer serializeToAvro(Object value) {
Object asAvro = convertToAvro(value);
if (value == null) {
return null;
}
try {
org.apache.avro.Schema schema = getSchema(asAvro);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = ENCODER_FACTORY.directBinaryEncoder(out, null);
DatumWriter<Object> writer;
writer = new GenericDatumWriter<Object>(schema);
writer.write(asAvro, encoder);
encoder.flush();
return ByteBuffer.wrap(out.toByteArray());
} catch (IOException e) {
throw new CopycatRuntimeException("Couldn't serialize object to Avro", e);
}
}
/**
* Convert the given object, in Avro format, into an org.apache.kafka.copycat.data object.
*/
public static Object convertFromAvro(Object value) {
if (value == null || value instanceof Byte || value instanceof Short ||
value instanceof Integer || value instanceof Long ||
value instanceof Float || value instanceof Double ||
value instanceof Boolean ||
value instanceof byte[] || value instanceof ByteBuffer) {
return value;
} else if (value instanceof CharSequence) {
// We need to be careful about CharSequences. This could be a String or a Utf8. If we passed
// Utf8 values directly through using Avro's implementation, equality wouldn't work.
if (value instanceof org.apache.avro.util.Utf8) {
return value.toString();
} else {
return value;
}
} else if (value instanceof org.apache.avro.generic.GenericRecord) {
org.apache.avro.generic.GenericRecord recordValue
= (org.apache.avro.generic.GenericRecord) value;
Schema copycatSchema = asCopycatSchema(recordValue.getSchema());
GenericRecordBuilder builder = new GenericRecordBuilder(copycatSchema);
for (org.apache.avro.Schema.Field field : recordValue.getSchema().getFields()) {
builder.set(field.name(), convertFromAvro(recordValue.get(field.name())));
}
return builder.build();
} else if (value instanceof Collection) {
Collection collection = (Collection) value;
List<Object> converted = new ArrayList<Object>(collection.size());
for (Object elem : collection) {
converted.add(convertFromAvro(elem));
}
return converted;
} else if (value instanceof Map) {
Map<String, Object> map = (Map<String, Object>) value;
Map<String, Object> converted = new TreeMap<String, Object>();
for (Map.Entry<String, Object> entry : map.entrySet()) {
converted.put(entry.getKey(), convertFromAvro(entry.getValue()));
}
return converted;
}
// Fixed and enum are handled by byte[] and String
throw new RuntimeException("Couldn't convert " + value + " from Avro.");
}
/**
* Deserialize and convert the provided binary Avro-serialized bytes into
* org.apache.kafka.copycat.data format.
*/
public static Object deserializeFromAvro(ByteBuffer serialized, Schema schema) {
org.apache.avro.Schema avroSchema = asAvroSchema(schema);
DatumReader reader = new GenericDatumReader(avroSchema);
try {
Object deserialized =
reader.read(null, DECODER_FACTORY.binaryDecoder(serialized.array(), null));
return convertFromAvro(deserialized);
} catch (IOException e) {
throw new CopycatRuntimeException("Couldn't deserialize object from Avro", e);
}
}
private static org.apache.avro.Schema createPrimitiveSchema(
org.apache.avro.Schema.Parser parser, String type) {
String schemaString = String.format("{\"type\" : \"%s\"}", type);
return parser.parse(schemaString);
}
private static org.apache.avro.Schema getSchema(Object object) {
if (object == null) {
return PRIMITIVE_SCHEMAS.get("Null");
} else if (object instanceof Boolean) {
return PRIMITIVE_SCHEMAS.get("Boolean");
} else if (object instanceof Integer) {
return PRIMITIVE_SCHEMAS.get("Integer");
} else if (object instanceof Long) {
return PRIMITIVE_SCHEMAS.get("Long");
} else if (object instanceof Float) {
return PRIMITIVE_SCHEMAS.get("Float");
} else if (object instanceof Double) {
return PRIMITIVE_SCHEMAS.get("Double");
} else if (object instanceof CharSequence) {
return PRIMITIVE_SCHEMAS.get("String");
} else if (object instanceof byte[] || object instanceof ByteBuffer) {
return PRIMITIVE_SCHEMAS.get("Bytes");
} else if (object instanceof org.apache.avro.generic.GenericContainer) {
return ((org.apache.avro.generic.GenericContainer) object).getSchema();
} else {
throw new IllegalArgumentException(
"Unsupported Avro type: " + object.getClass());
}
}
// Implementation note -- I considered trying to unify asAvroSchema and asCopycatSchema through
// a generic implementation. In practice this probably won't be worth the relatively minor
// amount of code duplication avoided since a) the generic version is messy and b) the Copycat
// type system and Avro type system are likely to diverge anyway, which would eventually
// require splitting them up.
public static org.apache.avro.Schema asAvroSchema(Schema schema) {
switch (schema.getType()) {
case BOOLEAN:
return org.apache.avro.SchemaBuilder.builder().booleanType();
case BYTES:
return org.apache.avro.SchemaBuilder.builder().bytesType();
case DOUBLE:
return org.apache.avro.SchemaBuilder.builder().doubleType();
case FLOAT:
return org.apache.avro.SchemaBuilder.builder().floatType();
case INT:
return org.apache.avro.SchemaBuilder.builder().intType();
case LONG:
return org.apache.avro.SchemaBuilder.builder().longType();
case NULL:
return org.apache.avro.SchemaBuilder.builder().nullType();
case STRING:
return org.apache.avro.SchemaBuilder.builder().stringType();
case RECORD: {
List<org.apache.avro.Schema.Field> fields = new ArrayList<org.apache.avro.Schema.Field>();
for (Schema.Field field : schema.getFields()) {
// NOTE: Order ignored
// TODO: Providing a default value would require translating since Avro's is a JsonNode
fields.add(new org.apache.avro.Schema.Field(
field.name(), asAvroSchema(field.schema()), field.doc(), null));
}
org.apache.avro.Schema result = org.apache.avro.Schema.createRecord(
schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
result.setFields(fields);
return result;
}
case UNION: {
List<org.apache.avro.Schema> unionTypes = new ArrayList<org.apache.avro.Schema>();
for (Schema origType : schema.getTypes()) {
unionTypes.add(asAvroSchema(origType));
}
return org.apache.avro.Schema.createUnion(unionTypes);
}
case ARRAY:
return org.apache.avro.Schema.createArray(asAvroSchema(schema.getElementType()));
case ENUM:
return org.apache.avro.Schema.createEnum(schema.getName(), schema.getDoc(), schema
.getNamespace(), schema.getEnumSymbols());
case FIXED:
return org.apache.avro.Schema.createFixed(schema.getName(), schema.getDoc(), schema
.getNamespace(), schema.getFixedSize());
case MAP:
return org.apache.avro.Schema.createMap(asAvroSchema(schema.getValueType()));
default:
throw new CopycatRuntimeException("Couldn't translate unsupported schema type "
+ schema.getType().getName() + ".");
}
}
public static Schema asCopycatSchema(org.apache.avro.Schema schema) {
switch (schema.getType()) {
case BOOLEAN:
return SchemaBuilder.builder().booleanType();
case BYTES:
return SchemaBuilder.builder().bytesType();
case DOUBLE:
return SchemaBuilder.builder().doubleType();
case FLOAT:
return SchemaBuilder.builder().floatType();
case INT:
return SchemaBuilder.builder().intType();
case LONG:
return SchemaBuilder.builder().longType();
case NULL:
return SchemaBuilder.builder().nullType();
case STRING:
return SchemaBuilder.builder().stringType();
case RECORD: {
List<Schema.Field> fields = new ArrayList<Schema.Field>();
for (org.apache.avro.Schema.Field field : schema.getFields()) {
// NOTE: Order ignored
// TODO: Providing a default value would require translating since Avro's is a JsonNode
fields.add(new Schema.Field(
field.name(), asCopycatSchema(field.schema()), field.doc(), null));
}
Schema result = Schema.createRecord(
schema.getName(), schema.getDoc(), schema.getNamespace(), schema.isError());
result.setFields(fields);
return result;
}
case UNION: {
List<Schema> unionTypes = new ArrayList<Schema>();
for (org.apache.avro.Schema origType : schema.getTypes()) {
unionTypes.add(asCopycatSchema(origType));
}
return Schema.createUnion(unionTypes);
}
case ARRAY:
return Schema.createArray(asCopycatSchema(schema.getElementType()));
case ENUM:
return Schema.createEnum(schema.getName(), schema.getDoc(), schema
.getNamespace(), schema.getEnumSymbols());
case FIXED:
return Schema.createFixed(schema.getName(), schema.getDoc(), schema
.getNamespace(), schema.getFixedSize());
case MAP:
return Schema.createMap(asCopycatSchema(schema.getValueType()));
default:
throw new CopycatRuntimeException("Couldn't translate unsupported schema type "
+ schema.getType().getName() + ".");
}
}
}

View File

@ -1,154 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.avro;
import org.apache.kafka.copycat.data.GenericRecord;
import org.apache.kafka.copycat.data.GenericRecordBuilder;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.Assert.*;
// Tests AvroData's conversion of Copycat -> Avro
public class AvroDataToAvroTest {
// All the primitive types are pass-through
@Test
public void testNull() {
assertNull(AvroData.convertToAvro(null));
}
@Test
public void testBoolean() {
Assert.assertEquals(true, AvroData.convertToAvro(true));
}
@Test
public void testInteger() {
Assert.assertEquals(12, AvroData.convertToAvro(12));
}
@Test
public void testLong() {
Assert.assertEquals(12L, AvroData.convertToAvro(12L));
}
@Test
public void testFloat() {
Assert.assertEquals(12.2f, AvroData.convertToAvro(12.2f));
}
@Test
public void testDouble() {
Assert.assertEquals(12.2, AvroData.convertToAvro(12.2));
}
@Test
public void testBytes() {
Object converted = AvroData.convertToAvro("foo".getBytes());
assertTrue(converted instanceof byte[]);
assertEquals(ByteBuffer.wrap("foo".getBytes()), ByteBuffer.wrap((byte[]) converted));
Assert.assertEquals(ByteBuffer.wrap("foo".getBytes()),
AvroData.convertToAvro(ByteBuffer.wrap("foo".getBytes())));
}
@Test
public void testString() {
Assert.assertEquals("string", AvroData.convertToAvro("string"));
}
@Test
public void testComplex() {
Schema schema = SchemaBuilder.record("record").fields()
.name("null").type().nullType().noDefault()
.requiredBoolean("boolean")
.requiredInt("int")
.requiredLong("long")
.requiredFloat("float")
.requiredDouble("double")
.requiredBytes("bytes")
.requiredString("string")
.name("union").type().unionOf().nullType().and().intType().endUnion().noDefault()
.name("array").type().array().items().intType().noDefault()
.name("map").type().map().values().intType().noDefault()
.name("fixed").type().fixed("fixed").size(3).noDefault()
.name("enum").type().enumeration("enum").symbols("one", "two").noDefault()
.endRecord();
GenericRecord record = new GenericRecordBuilder(schema)
.set("null", null)
.set("boolean", true)
.set("int", 12)
.set("long", 12L)
.set("float", 12.2f)
.set("double", 12.2)
.set("bytes", ByteBuffer.wrap("foo".getBytes()))
.set("string", "string-value")
.set("union", 12)
.set("array", Arrays.asList(1, 2, 3))
.set("map", Collections.singletonMap("field", 1))
.set("fixed", ByteBuffer.wrap("foo".getBytes()))
.set("enum", "one")
.build();
Object convertedRecord = AvroData.convertToAvro(record);
org.apache.avro.Schema avroSchema = org.apache.avro.SchemaBuilder.record("record").fields()
.name("null").type().nullType().noDefault()
.requiredBoolean("boolean")
.requiredInt("int")
.requiredLong("long")
.requiredFloat("float")
.requiredDouble("double")
.requiredBytes("bytes")
.requiredString("string")
.name("union").type().unionOf().nullType().and().intType().endUnion().noDefault()
.name("array").type().array().items().intType().noDefault()
.name("map").type().map().values().intType().noDefault()
.name("fixed").type().fixed("fixed").size(3).noDefault()
.name("enum").type().enumeration("enum").symbols("one", "two").noDefault()
.endRecord();
org.apache.avro.generic.GenericRecord avroRecord
= new org.apache.avro.generic.GenericRecordBuilder(avroSchema)
.set("null", null)
.set("boolean", true)
.set("int", 12)
.set("long", 12L)
.set("float", 12.2f)
.set("double", 12.2)
.set("bytes", ByteBuffer.wrap("foo".getBytes()))
.set("string", "string-value")
.set("union", 12)
.set("array", Arrays.asList(1, 2, 3))
.set("map", Collections.singletonMap("field", 1))
.set("fixed", ByteBuffer.wrap("foo".getBytes()))
.set("enum", "one")
.build();
assertEquals(avroSchema, ((org.apache.avro.generic.GenericRecord) convertedRecord).getSchema());
assertEquals(avroRecord, convertedRecord);
}
}

View File

@ -1,154 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.avro;
import org.apache.kafka.copycat.data.GenericRecord;
import org.apache.kafka.copycat.data.GenericRecordBuilder;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import static org.junit.Assert.*;
// Tests AvroData's conversion of Avro -> Copycat
public class AvroDataToCopycatTest {
// All the primitive types are pass-through
@Test
public void testNull() {
assertNull(AvroData.convertFromAvro(null));
}
@Test
public void testBoolean() {
Assert.assertEquals(true, AvroData.convertFromAvro(true));
}
@Test
public void testInteger() {
Assert.assertEquals(12, AvroData.convertFromAvro(12));
}
@Test
public void testLong() {
Assert.assertEquals(12L, AvroData.convertFromAvro(12L));
}
@Test
public void testFloat() {
Assert.assertEquals(12.2f, AvroData.convertFromAvro(12.2f));
}
@Test
public void testDouble() {
Assert.assertEquals(12.2, AvroData.convertFromAvro(12.2));
}
@Test
public void testBytes() {
Object converted = AvroData.convertFromAvro("foo".getBytes());
assertTrue(converted instanceof byte[]);
assertEquals(ByteBuffer.wrap("foo".getBytes()), ByteBuffer.wrap((byte[]) converted));
Assert.assertEquals(ByteBuffer.wrap("foo".getBytes()),
AvroData.convertFromAvro(ByteBuffer.wrap("foo".getBytes())));
}
@Test
public void testString() {
Assert.assertEquals("string", AvroData.convertFromAvro("string"));
}
@Test
public void testComplex() {
org.apache.avro.Schema avroSchema = org.apache.avro.SchemaBuilder.record("record").fields()
.name("null").type().nullType().noDefault()
.requiredBoolean("boolean")
.requiredInt("int")
.requiredLong("long")
.requiredFloat("float")
.requiredDouble("double")
.requiredBytes("bytes")
.requiredString("string")
.name("union").type().unionOf().nullType().and().intType().endUnion().noDefault()
.name("array").type().array().items().intType().noDefault()
.name("map").type().map().values().intType().noDefault()
.name("fixed").type().fixed("fixed").size(3).noDefault()
.name("enum").type().enumeration("enum").symbols("one", "two").noDefault()
.endRecord();
org.apache.avro.generic.GenericRecord avroRecord
= new org.apache.avro.generic.GenericRecordBuilder(avroSchema)
.set("null", null)
.set("boolean", true)
.set("int", 12)
.set("long", 12L)
.set("float", 12.2f)
.set("double", 12.2)
.set("bytes", ByteBuffer.wrap("foo".getBytes()))
.set("string", "string-value")
.set("union", 12)
.set("array", Arrays.asList(1, 2, 3))
.set("map", Collections.singletonMap("field", 1))
.set("fixed", ByteBuffer.wrap("foo".getBytes()))
.set("enum", "one")
.build();
Object convertedRecord = AvroData.convertFromAvro(avroRecord);
Schema schema = SchemaBuilder.record("record").fields()
.name("null").type().nullType().noDefault()
.requiredBoolean("boolean")
.requiredInt("int")
.requiredLong("long")
.requiredFloat("float")
.requiredDouble("double")
.requiredBytes("bytes")
.requiredString("string")
.name("union").type().unionOf().nullType().and().intType().endUnion().noDefault()
.name("array").type().array().items().intType().noDefault()
.name("map").type().map().values().intType().noDefault()
.name("fixed").type().fixed("fixed").size(3).noDefault()
.name("enum").type().enumeration("enum").symbols("one", "two").noDefault()
.endRecord();
GenericRecord record = new GenericRecordBuilder(schema)
.set("null", null)
.set("boolean", true)
.set("int", 12)
.set("long", 12L)
.set("float", 12.2f)
.set("double", 12.2)
.set("bytes", ByteBuffer.wrap("foo".getBytes()))
.set("string", "string-value")
.set("union", 12)
.set("array", Arrays.asList(1, 2, 3))
.set("map", Collections.singletonMap("field", 1))
.set("fixed", ByteBuffer.wrap("foo".getBytes()))
.set("enum", "one")
.build();
assertEquals(schema, ((GenericRecord) convertedRecord).getSchema());
assertEquals(record, convertedRecord);
}
}

View File

@ -1,229 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.avro;
/*
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import org.powermock.api.easymock.PowerMock;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
*/
public class AvroOffsetStorageReaderTest {
/* FIXME
private static final String namespace = "namespace";
private OffsetBackingStore offsetBackingStore;
private OffsetStorageReaderImpl offsetReader;
// Most requests will make a request to the underlying storage and want to verify some info
// about that request
private Capture<Collection<ByteBuffer>> requestKeys;
private Capture<Callback<Map<ByteBuffer, ByteBuffer>>> callback;
private Future<Map<ByteBuffer, ByteBuffer>> storeFuture;
// Cover a few different types to verify serialization works and that for keys we're allowed to
// mix-and-match within a single request
// TODO Add testing of complex types for stream IDs & offsets to these primitive types
private static final List<Object> streamIds = Arrays.asList((Object) "stream1", (Object) 52);
private static final Object nullStreamId = null;
// Same as streamIds but with extra, unserializable entry
private static final List<Object> streamIdsWithUnserializable
= Arrays.asList((Object) "stream1", (Object) 52, new Date());
private static final List<Object> longOffsets = Arrays.asList((Object) 12L, (Object) 24L);
private static final List<Object> stringOffsets = Arrays.asList(
(Object) "offset1", (Object) "offset2");
private static final Schema longSchema = SchemaBuilder.builder().longType();
private static final Schema stringSchema = SchemaBuilder.builder().stringType();
// Serialized form of data to be returned by the storage layer
private static final Map<ByteBuffer, ByteBuffer> longsSerialized
= new HashMap<ByteBuffer, ByteBuffer>();
private static final Map<ByteBuffer, ByteBuffer> stringsSerialized
= new HashMap<ByteBuffer, ByteBuffer>();
private static final Map<ByteBuffer, ByteBuffer> singleLongSerialized
= new HashMap<ByteBuffer, ByteBuffer>();
private static final Map<ByteBuffer, ByteBuffer> nullsSerialized
= new HashMap<ByteBuffer, ByteBuffer>();
private static final Map<ByteBuffer, ByteBuffer> longsSerializedWithInvalid
= new HashMap<ByteBuffer, ByteBuffer>();
static {
for (int i = 0; i < longOffsets.size(); i++) {
longsSerialized.put(
AvroData.serializeToAvro(streamIds.get(i)),
AvroData.serializeToAvro(longOffsets.get(i)));
}
for (int i = 0; i < stringOffsets.size(); i++) {
stringsSerialized.put(
AvroData.serializeToAvro(streamIds.get(i)),
AvroData.serializeToAvro(stringOffsets.get(i)));
}
singleLongSerialized.put(
AvroData.serializeToAvro(streamIds.get(0)),
AvroData.serializeToAvro(longOffsets.get(0)));
nullsSerialized.put(null, null);
longsSerializedWithInvalid.put(
AvroData.serializeToAvro(streamIds.get(0)),
AvroData.serializeToAvro(longOffsets.get(0)));
// You need to be careful about the bytes specified here because Avro's variable length
// encoding allows some short byte sequences to be valid values for a long
longsSerializedWithInvalid.put(
AvroData.serializeToAvro(streamIds.get(1)),
ByteBuffer.wrap(new byte[0]));
}
@Before
public void setup() {
offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, namespace);
requestKeys = EasyMock.newCapture();
callback = EasyMock.newCapture();
storeFuture = PowerMock.createMock(Future.class);
}
@Test
public void testGetOffsetsStringValues() throws Exception {
expectStoreRequest(stringsSerialized);
PowerMock.replayAll();
Map<Object, Object> result = offsetReader.getOffsets(streamIds, stringSchema);
assertEquals(2, requestKeys.getValue().size());
assertEquals(mapFromLists(streamIds, stringOffsets), result);
PowerMock.verifyAll();
}
@Test
public void testGetOffsetsLongValues() throws Exception {
expectStoreRequest(longsSerialized);
PowerMock.replayAll();
Map<Object, Object> result = offsetReader.getOffsets(streamIds, longSchema);
assertEquals(2, requestKeys.getValue().size());
assertEquals(mapFromLists(streamIds, longOffsets), result);
PowerMock.verifyAll();
}
// getOffset() isn't too interesting since it's just a degenerate form of getOffsets(), so we
// just do one simple validation
@Test
public void testGetOffset() throws Exception {
expectStoreRequest(singleLongSerialized);
PowerMock.replayAll();
Object result = offsetReader.getOffset(streamIds.get(0), longSchema);
assertEquals(1, requestKeys.getValue().size());
assertEquals(longOffsets.get(0), result);
PowerMock.verifyAll();
}
@Test
public void testGetOffsetsNulls() throws Exception {
// Should be able to store/load null values
expectStoreRequest(nullsSerialized);
PowerMock.replayAll();
Object result = offsetReader.getOffset(null, longSchema);
assertEquals(1, requestKeys.getValue().size());
assertNull(result);
PowerMock.verifyAll();
}
@Test
public void testSerializationErrorReturnsOtherResults() throws Exception {
expectStoreRequest(longsSerialized);
PowerMock.replayAll();
Map<Object, Object> result = offsetReader.getOffsets(streamIdsWithUnserializable, longSchema);
assertEquals(2, requestKeys.getValue().size());
assertEquals(mapFromLists(streamIds, longOffsets), result);
PowerMock.verifyAll();
}
@Test(expected = CopycatRuntimeException.class)
public void testStorageGetFailed() throws Exception {
// backing store failed -> CopycatRuntimeException
EasyMock.expect(offsetBackingStore.get(EasyMock.eq(namespace), EasyMock.capture(requestKeys),
EasyMock.capture(callback)))
.andReturn(storeFuture);
EasyMock.expect(storeFuture.get())
.andThrow(new RuntimeException("failed to get data from backing store"));
PowerMock.replayAll();
Map<Object, Object> result = offsetReader.getOffsets(streamIds, longSchema);
assertEquals(2, requestKeys.getValue().size()); // throws
}
@Test
public void testUndeserializeableData() throws Exception {
// Should return whatever data it can, ignoring the unserializeable data
expectStoreRequest(longsSerializedWithInvalid);
PowerMock.replayAll();
Map<Object, Object> result = offsetReader.getOffsets(streamIds, longSchema);
assertEquals(2, requestKeys.getValue().size());
assertEquals(mapFromLists(streamIds.subList(0, 1), longOffsets.subList(0, 1)), result);
PowerMock.verifyAll();
}
private void expectStoreRequest(Map<ByteBuffer, ByteBuffer> result) throws Exception {
EasyMock.expect(offsetBackingStore.get(EasyMock.eq(namespace), EasyMock.capture(requestKeys),
EasyMock.capture(callback)))
.andReturn(storeFuture);
EasyMock.expect(storeFuture.get()).andReturn(result);
}
private Map<Object, Object> mapFromLists(List<Object> keys, List<Object> values) {
Map<Object, Object> result = new HashMap<Object, Object>();
for (int i = 0; i < keys.size(); i++) {
result.put(keys.get(i), values.get(i));
}
return result;
}
*/
}

View File

@ -0,0 +1,352 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.copycat.data.*;
import org.apache.kafka.copycat.errors.CopycatRuntimeException;
import org.apache.kafka.copycat.storage.Converter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
/**
* Implementation of Converter that uses JSON to store schemas and objects.
*/
public class JsonConverter implements Converter {
private static final HashMap<String, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS
= new HashMap<>();
static {
TO_COPYCAT_CONVERTERS.put(JsonSchema.BOOLEAN_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
return value.booleanValue();
}
});
TO_COPYCAT_CONVERTERS.put(JsonSchema.INT_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
return value.intValue();
}
});
TO_COPYCAT_CONVERTERS.put(JsonSchema.LONG_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
return value.longValue();
}
});
TO_COPYCAT_CONVERTERS.put(JsonSchema.FLOAT_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
return value.floatValue();
}
});
TO_COPYCAT_CONVERTERS.put(JsonSchema.DOUBLE_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
return value.doubleValue();
}
});
TO_COPYCAT_CONVERTERS.put(JsonSchema.BYTES_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
try {
return value.binaryValue();
} catch (IOException e) {
throw new CopycatRuntimeException("Invalid bytes field", e);
}
}
});
TO_COPYCAT_CONVERTERS.put(JsonSchema.STRING_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
return value.textValue();
}
});
TO_COPYCAT_CONVERTERS.put(JsonSchema.OBJECT_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
JsonNode jsonSchemaFields = jsonSchema.get(JsonSchema.OBJECT_FIELDS_FIELD_NAME);
if (jsonSchemaFields == null || !jsonSchemaFields.isArray())
throw new CopycatRuntimeException("Invalid object schema, should contain list of fields.");
HashMap<String, JsonNode> jsonSchemaFieldsByName = new HashMap<>();
for (JsonNode fieldSchema : jsonSchemaFields) {
JsonNode name = fieldSchema.get("name");
if (name == null || !name.isTextual())
throw new CopycatRuntimeException("Invalid field name");
jsonSchemaFieldsByName.put(name.textValue(), fieldSchema);
}
Schema schema = asCopycatSchema(jsonSchema);
GenericRecordBuilder builder = new GenericRecordBuilder(schema);
// TODO: We should verify both the schema fields and actual fields are exactly identical
Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
while (fieldIt.hasNext()) {
Map.Entry<String, JsonNode> entry = fieldIt.next();
JsonNode fieldSchema = jsonSchemaFieldsByName.get(entry.getKey());
builder.set(entry.getKey(), convertToCopycat(fieldSchema, entry.getValue()));
}
return builder.build();
}
});
TO_COPYCAT_CONVERTERS.put(JsonSchema.ARRAY_TYPE_NAME, new JsonToCopycatTypeConverter() {
@Override
public Object convert(JsonNode jsonSchema, JsonNode value) {
JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
if (elemSchema == null)
throw new CopycatRuntimeException("Array schema did not specify the element type");
ArrayList<Object> result = new ArrayList<>();
for (JsonNode elem : value) {
result.add(convertToCopycat(elemSchema, elem));
}
return result;
}
});
}
@Override
public JsonNode fromCopycatData(Object value) {
return convertToJsonWithSchemaEnvelope(value);
}
@Override
public Object toCopycatData(Object value) {
if (!(value instanceof JsonNode)) {
throw new CopycatRuntimeException("JsonConvert can only convert JsonNode objects.");
}
JsonNode data = (JsonNode) value;
if (!data.isObject() || data.size() != 2 || !data.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !data.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) {
throw new CopycatRuntimeException("JSON data converted to Copycat must be in envelope containing schema");
}
return convertToCopycat(data.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME), data.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
private static JsonNode asJsonSchema(Schema schema) {
switch (schema.getType()) {
case BOOLEAN:
return JsonSchema.BOOLEAN_SCHEMA;
case BYTES:
return JsonSchema.BYTES_SCHEMA;
case DOUBLE:
return JsonSchema.DOUBLE_SCHEMA;
case FLOAT:
return JsonSchema.FLOAT_SCHEMA;
case INT:
return JsonSchema.INT_SCHEMA;
case LONG:
return JsonSchema.LONG_SCHEMA;
case NULL:
throw new UnsupportedOperationException("null schema not supported");
case STRING:
return JsonSchema.STRING_SCHEMA;
case RECORD: {
ObjectNode recordSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.OBJECT_TYPE_NAME);
ArrayNode fields = recordSchema.putArray(JsonSchema.OBJECT_FIELDS_FIELD_NAME);
for (Schema.Field field : schema.getFields()) {
fields.add(JsonNodeFactory.instance.objectNode().set(field.name(), asJsonSchema(field.schema())));
}
return recordSchema;
}
case UNION: {
throw new UnsupportedOperationException("union schema not supported");
}
case ARRAY:
return JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME)
.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.getElementType()));
case ENUM:
throw new UnsupportedOperationException("enum schema not supported");
case FIXED:
throw new UnsupportedOperationException("fixed schema not supported");
case MAP:
throw new UnsupportedOperationException("map schema not supported");
default:
throw new CopycatRuntimeException("Couldn't translate unsupported schema type " + schema.getType().getName() + ".");
}
}
private static Schema asCopycatSchema(JsonNode jsonSchema) {
if (jsonSchema.isNull()) {
return null;
}
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
if (schemaTypeNode == null || !schemaTypeNode.isTextual()) {
throw new CopycatRuntimeException("Schema must contain 'type' field");
}
switch (schemaTypeNode.textValue()) {
case JsonSchema.BOOLEAN_TYPE_NAME:
return SchemaBuilder.builder().booleanType();
case JsonSchema.INT_TYPE_NAME:
return SchemaBuilder.builder().intType();
case JsonSchema.LONG_TYPE_NAME:
return SchemaBuilder.builder().longType();
case JsonSchema.FLOAT_TYPE_NAME:
return SchemaBuilder.builder().floatType();
case JsonSchema.DOUBLE_TYPE_NAME:
return SchemaBuilder.builder().doubleType();
case JsonSchema.BYTES_TYPE_NAME:
return SchemaBuilder.builder().bytesType();
case JsonSchema.STRING_TYPE_NAME:
return SchemaBuilder.builder().stringType();
case JsonSchema.ARRAY_TYPE_NAME:
JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
if (elemSchema == null)
throw new CopycatRuntimeException("Array schema did not specify the element type");
return Schema.createArray(asCopycatSchema(elemSchema));
case JsonSchema.OBJECT_TYPE_NAME:
JsonNode jsonSchemaName = jsonSchema.get(JsonSchema.SCHEMA_NAME_FIELD_NAME);
if (jsonSchemaName == null || !jsonSchemaName.isTextual())
throw new CopycatRuntimeException("Invalid object schema, should contain name.");
JsonNode jsonSchemaFields = jsonSchema.get(JsonSchema.OBJECT_FIELDS_FIELD_NAME);
if (jsonSchemaFields == null || !jsonSchemaFields.isArray())
throw new CopycatRuntimeException("Invalid object schema, should contain list of fields.");
List<Schema.Field> fields = new ArrayList<>();
// TODO: We should verify both the schema fields and actual fields are exactly identical
for (JsonNode fieldJsonSchema : jsonSchemaFields) {
JsonNode fieldName = fieldJsonSchema.get(JsonSchema.OBJECT_FIELD_NAME_FIELD_NAME);
if (fieldName == null || !fieldName.isTextual())
throw new CopycatRuntimeException("Object field missing name");
// TODO: doc, default value?
fields.add(new Schema.Field(fieldName.textValue(), asCopycatSchema(fieldJsonSchema), null, null));
}
Schema result = Schema.createRecord(jsonSchemaName.textValue(), null, null, false);
result.setFields(fields);
return result;
default:
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode.textValue());
}
}
/**
* Convert this object, in org.apache.kafka.copycat.data format, into a JSON object with an envelope object
* containing schema and payload fields.
* @param value
* @return
*/
private static JsonNode convertToJsonWithSchemaEnvelope(Object value) {
return convertToJson(value).toJsonNode();
}
/**
* Convert this object, in the org.apache.kafka.copycat.data format, into a JSON object, returning both the schema
* and the converted object.
*/
private static JsonSchema.Envelope convertToJson(Object value) {
if (value == null) {
return JsonSchema.nullEnvelope();
} else if (value instanceof Boolean) {
return JsonSchema.booleanEnvelope((Boolean) value);
} else if (value instanceof Byte) {
return JsonSchema.intEnvelope((Byte) value);
} else if (value instanceof Short) {
return JsonSchema.intEnvelope((Short) value);
} else if (value instanceof Integer) {
return JsonSchema.intEnvelope((Integer) value);
} else if (value instanceof Long) {
return JsonSchema.longEnvelope((Long) value);
} else if (value instanceof Float) {
return JsonSchema.floatEnvelope((Float) value);
} else if (value instanceof Double) {
return JsonSchema.doubleEnvelope((Double) value);
} else if (value instanceof byte[]) {
return JsonSchema.bytesEnvelope((byte[]) value);
} else if (value instanceof ByteBuffer) {
return JsonSchema.bytesEnvelope(((ByteBuffer) value).array());
} else if (value instanceof CharSequence) {
return JsonSchema.stringEnvelope(value.toString());
} else if (value instanceof GenericRecord) {
GenericRecord recordValue = (GenericRecord) value;
ObjectNode schema = JsonNodeFactory.instance.objectNode();
schema.put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.OBJECT_TYPE_NAME);
schema.put(JsonSchema.SCHEMA_NAME_FIELD_NAME, recordValue.getSchema().getName());
ArrayNode schemaFields = JsonNodeFactory.instance.arrayNode();
schema.set(JsonSchema.OBJECT_FIELDS_FIELD_NAME, schemaFields);
ObjectNode record = JsonNodeFactory.instance.objectNode();
for (Schema.Field field : recordValue.getSchema().getFields()) {
JsonSchema.Envelope fieldSchemaAndValue = convertToJson(recordValue.get(field.name()));
// Fill in the field name since this is part of the field schema spec but the call to convertToJson that
// created it does not have access to the field name. This *must* copy the schema since it may be one of
// the primitive schemas.
ObjectNode fieldSchema = ((ObjectNode) fieldSchemaAndValue.schema).deepCopy();
fieldSchema.put(JsonSchema.OBJECT_FIELD_NAME_FIELD_NAME, field.name());
schemaFields.add(fieldSchema);
record.set(field.name(), fieldSchemaAndValue.payload);
}
return new JsonSchema.Envelope(schema, record);
} else if (value instanceof Collection) {
Collection collection = (Collection) value;
ObjectNode schema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME);
ArrayNode list = JsonNodeFactory.instance.arrayNode();
JsonNode itemSchema = null;
for (Object elem : collection) {
JsonSchema.Envelope fieldSchemaAndValue = convertToJson(elem);
if (itemSchema == null) {
itemSchema = fieldSchemaAndValue.schema;
schema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, itemSchema);
} else {
if (!itemSchema.equals(fieldSchemaAndValue.schema)) {
throw new CopycatRuntimeException("Mismatching schemas found in a list.");
}
}
list.add(fieldSchemaAndValue.payload);
}
return new JsonSchema.Envelope(schema, list);
}
throw new CopycatRuntimeException("Couldn't convert " + value + " to Avro.");
}
private static Object convertToCopycat(JsonNode jsonSchema, JsonNode jsonValue) {
if (jsonSchema.isNull()) {
return null;
}
JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
if (schemaTypeNode == null || !schemaTypeNode.isTextual()) {
throw new CopycatRuntimeException("Schema must contain 'type' field. Schema: " + jsonSchema.toString());
}
JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schemaTypeNode.textValue());
if (typeConverter != null) {
return typeConverter.convert(jsonSchema, jsonValue);
}
throw new CopycatRuntimeException("Unknown schema type: " + schemaTypeNode);
}
private interface JsonToCopycatTypeConverter {
Object convert(JsonNode schema, JsonNode value);
}
}

View File

@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
/**
* JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily
* structured data without having associated Java classes. This deserializer also supports Copycat schemas.
*/
public class JsonDeserializer implements Deserializer<JsonNode> {
private static final ObjectNode CATCH_ALL_OBJECT_SCHEMA = JsonNodeFactory.instance.objectNode();
private static final ObjectNode CATCH_ALL_ARRAY_SCHEMA = JsonNodeFactory.instance.objectNode();
private static final ArrayNode ALL_SCHEMAS_LIST = JsonNodeFactory.instance.arrayNode();
private static final ObjectNode CATCH_ALL_SCHEMA = JsonNodeFactory.instance.objectNode();
static {
CATCH_ALL_OBJECT_SCHEMA.put("type", "object")
.putArray("field").add(JsonNodeFactory.instance.objectNode().put("*", "all"));
CATCH_ALL_ARRAY_SCHEMA.put("type", "array").put("items", "all");
ALL_SCHEMAS_LIST.add("boolean").add("int").add("long").add("float").add("double").add("bytes").add("string")
.add(CATCH_ALL_ARRAY_SCHEMA).add(CATCH_ALL_OBJECT_SCHEMA);
CATCH_ALL_SCHEMA.put("name", "all").set("type", ALL_SCHEMAS_LIST);
}
private ObjectMapper objectMapper = new ObjectMapper();
/**
* Default constructor needed by Kafka
*/
public JsonDeserializer() {
}
@Override
public void configure(Map<String, ?> props, boolean isKey) {
}
@Override
public JsonNode deserialize(String topic, byte[] bytes) {
JsonNode data;
try {
data = objectMapper.readTree(bytes);
} catch (Exception e) {
throw new SerializationException(e);
}
// The deserialized data should either be an envelope object containing the schema and the payload or the schema
// was stripped during serialization and we need to fill in an all-encompassing schema.
if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload")) {
ObjectNode envelope = JsonNodeFactory.instance.objectNode();
envelope.set("schema", CATCH_ALL_SCHEMA);
envelope.set("payload", data);
data = envelope;
}
return data;
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.nio.ByteBuffer;
public class JsonSchema {
static final String ENVELOPE_SCHEMA_FIELD_NAME = "schema";
static final String ENVELOPE_PAYLOAD_FIELD_NAME = "payload";
static final String SCHEMA_TYPE_FIELD_NAME = "type";
static final String SCHEMA_NAME_FIELD_NAME = "name";
static final String OBJECT_FIELD_NAME_FIELD_NAME = "name";
static final String OBJECT_FIELDS_FIELD_NAME = "fields";
static final String ARRAY_ITEMS_FIELD_NAME = "items";
static final String BOOLEAN_TYPE_NAME = "boolean";
static final JsonNode BOOLEAN_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BOOLEAN_TYPE_NAME);
static final String INT_TYPE_NAME = "int";
static final JsonNode INT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT_TYPE_NAME);
static final String LONG_TYPE_NAME = "long";
static final JsonNode LONG_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, LONG_TYPE_NAME);
static final String FLOAT_TYPE_NAME = "float";
static final JsonNode FLOAT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, FLOAT_TYPE_NAME);
static final String DOUBLE_TYPE_NAME = "double";
static final JsonNode DOUBLE_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, DOUBLE_TYPE_NAME);
static final String BYTES_TYPE_NAME = "bytes";
static final JsonNode BYTES_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BYTES_TYPE_NAME);
static final String STRING_TYPE_NAME = "string";
static final JsonNode STRING_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, STRING_TYPE_NAME);
static final String ARRAY_TYPE_NAME = "array";
static final String OBJECT_TYPE_NAME = "object";
public static ObjectNode envelope(JsonNode schema, JsonNode payload) {
ObjectNode result = JsonNodeFactory.instance.objectNode();
result.set(ENVELOPE_SCHEMA_FIELD_NAME, schema);
result.set(ENVELOPE_PAYLOAD_FIELD_NAME, payload);
return result;
}
static class Envelope {
public JsonNode schema;
public JsonNode payload;
public Envelope(JsonNode schema, JsonNode payload) {
this.schema = schema;
this.payload = payload;
}
public ObjectNode toJsonNode() {
return envelope(schema, payload);
}
}
public static Envelope nullEnvelope() {
return new Envelope(null, null);
}
public static Envelope booleanEnvelope(boolean value) {
return new Envelope(JsonSchema.BOOLEAN_SCHEMA, JsonNodeFactory.instance.booleanNode(value));
}
public static Envelope intEnvelope(byte value) {
return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
}
public static Envelope intEnvelope(short value) {
return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
}
public static Envelope intEnvelope(int value) {
return new Envelope(JsonSchema.INT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
}
public static Envelope longEnvelope(long value) {
return new Envelope(JsonSchema.LONG_SCHEMA, JsonNodeFactory.instance.numberNode(value));
}
public static Envelope floatEnvelope(float value) {
return new Envelope(JsonSchema.FLOAT_SCHEMA, JsonNodeFactory.instance.numberNode(value));
}
public static Envelope doubleEnvelope(double value) {
return new Envelope(JsonSchema.DOUBLE_SCHEMA, JsonNodeFactory.instance.numberNode(value));
}
public static Envelope bytesEnvelope(byte[] value) {
return new Envelope(JsonSchema.BYTES_SCHEMA, JsonNodeFactory.instance.binaryNode(value));
}
public static Envelope bytesEnvelope(ByteBuffer value) {
return new Envelope(JsonSchema.BYTES_SCHEMA, JsonNodeFactory.instance.binaryNode(value.array()));
}
public static Envelope stringEnvelope(CharSequence value) {
return new Envelope(JsonSchema.STRING_SCHEMA, JsonNodeFactory.instance.textNode(value.toString()));
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
/**
* Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily
* structured data without corresponding Java classes. This serializer also supports Copycat schemas.
*/
public class JsonSerializer implements Serializer<JsonNode> {
private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
private ObjectMapper objectMapper = new ObjectMapper();
private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
/**
* Default constructor needed by Kafka
*/
public JsonSerializer() {
}
@Override
public void configure(Map<String, ?> config, boolean isKey) {
Object enableConfigsVal = config.get(SCHEMAS_ENABLE_CONFIG);
if (enableConfigsVal != null)
enableSchemas = enableConfigsVal.toString().equals("true");
}
@Override
public byte[] serialize(String topic, JsonNode data) {
// This serializer works for Copycat data that requires a schema to be included, so we expect it to have a
// specific format: { "schema": {...}, "payload": ... }.
if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload"))
throw new SerializationException("JsonSerializer requires \"schema\" and \"payload\" fields and may not contain additional fields");
try {
if (!enableSchemas)
data = data.get("payload");
return objectMapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
@Override
public void close() {
}
}

View File

@ -0,0 +1,211 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.copycat.json;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import org.apache.kafka.copycat.data.GenericRecord;
import org.apache.kafka.copycat.data.GenericRecordBuilder;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.junit.Test;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class JsonConverterTest {
ObjectMapper objectMapper = new ObjectMapper();
JsonConverter converter = new JsonConverter();
@Test
public void booleanToCopycat() {
assertEquals(true, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
assertEquals(false, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }")));
}
@Test
public void intToCopycat() {
assertEquals(12, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int\" }, \"payload\": 12 }")));
}
@Test
public void longToCopycat() {
assertEquals(12L, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"long\" }, \"payload\": 12 }")));
assertEquals(4398046511104L, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"long\" }, \"payload\": 4398046511104 }")));
}
@Test
public void floatToCopycat() {
assertEquals(12.34f, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }")));
}
@Test
public void doubleToCopycat() {
assertEquals(12.34, converter.toCopycatData(parse("{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }")));
}
@Test
public void bytesToCopycat() throws UnsupportedEncodingException {
ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8"));
String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }";
ByteBuffer converted = ByteBuffer.wrap((byte[]) converter.toCopycatData(parse(msg)));
assertEquals(reference, converted);
}
@Test
public void stringToCopycat() {
assertEquals("foo-bar-baz", converter.toCopycatData(parse("{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }")));
}
@Test
public void arrayToCopycat() {
JsonNode arrayJson = parse("{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int\" } }, \"payload\": [1, 2, 3] }");
assertEquals(Arrays.asList(1, 2, 3), converter.toCopycatData(arrayJson));
}
@Test
public void objectToCopycat() {
JsonNode objectJson = parse("{ \"schema\": { \"type\": \"object\", \"name\": \"record\", \"fields\": [{ \"name\": \"first\", \"type\" : \"int\"}, { \"name\": \"second\", \"type\" : \"string\"}] }" +
", \"payload\": { \"first\": 15, \"second\": \"foobar\" } }");
Schema schema = SchemaBuilder.record("record").fields()
.requiredInt("first")
.requiredString("second")
.endRecord();
GenericRecord record = new GenericRecordBuilder(schema)
.set("first", 15)
.set("second", "foobar")
.build();
assertEquals(record, converter.toCopycatData(objectJson));
}
@Test
public void booleanToJson() {
JsonNode converted = converter.fromCopycatData(true);
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"boolean\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
}
@Test
public void intToJson() {
JsonNode converted = converter.fromCopycatData(12);
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"int\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
}
@Test
public void longToJson() {
JsonNode converted = converter.fromCopycatData(4398046511104L);
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"long\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(4398046511104L, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue());
}
@Test
public void floatToJson() {
JsonNode converted = converter.fromCopycatData(12.34f);
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"float\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(12.34f, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001);
}
@Test
public void doubleToJson() {
JsonNode converted = converter.fromCopycatData(12.34);
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"double\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(12.34, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001);
}
@Test
public void bytesToJson() throws IOException {
JsonNode converted = converter.fromCopycatData("test-string".getBytes());
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"bytes\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(ByteBuffer.wrap("test-string".getBytes()),
ByteBuffer.wrap(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue()));
}
@Test
public void stringToJson() {
JsonNode converted = converter.fromCopycatData("test-string");
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"string\" }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue());
}
@Test
public void arrayToJson() {
JsonNode converted = converter.fromCopycatData(Arrays.asList(1, 2, 3));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int\" } }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add(2).add(3),
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
@Test
public void objectToJson() {
Schema schema = SchemaBuilder.record("record").fields()
.requiredInt("first")
.requiredString("second")
.endRecord();
GenericRecord record = new GenericRecordBuilder(schema)
.set("first", 15)
.set("second", "foobar")
.build();
JsonNode converted = converter.fromCopycatData(record);
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"object\", \"name\": \"record\", \"fields\": [{ \"name\": \"first\", \"type\" : \"int\"}, { \"name\": \"second\", \"type\" : \"string\"}] }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertEquals(parse("{ \"first\": 15, \"second\": \"foobar\" }"),
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
private JsonNode parse(String json) {
try {
return objectMapper.readTree(json);
} catch (IOException e) {
fail("IOException during JSON parse: " + e.getMessage());
throw new RuntimeException("failed");
}
}
private void validateEnvelope(JsonNode env) {
assertNotNull(env);
assertTrue(env.isObject());
assertEquals(2, env.size());
assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isObject());
assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
}
}

View File

@ -119,14 +119,14 @@ public class Worker {
Properties unusedConfigs = config.getUnusedProperties();
Map<String, Object> avroProps = new HashMap<String, Object>();
avroProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
avroProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName());
avroProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName());
Map<String, Object> producerProps = new HashMap<String, Object>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName());
for (String propName : unusedConfigs.stringPropertyNames()) {
avroProps.put(propName, unusedConfigs.getProperty(propName));
producerProps.put(propName, unusedConfigs.getProperty(propName));
}
producer = new KafkaProducer<Object, Object>(avroProps);
producer = new KafkaProducer<Object, Object>(producerProps);
offsetBackingStore.start();
sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config);

View File

@ -159,7 +159,7 @@ public class WorkerSourceTask implements WorkerTask {
recordSent(producerRecord);
}
});
// Offsets are converted to Avro & serialized in the OffsetWriter
// Offsets are converted & serialized in the OffsetWriter
offsetWriter.setOffset(record.getStream(), record.getOffset());
}
}

View File

@ -77,12 +77,11 @@ public class WorkerSinkTaskTest extends ThreadedTest {
time = new MockTime();
sinkTask = PowerMock.createMock(SinkTask.class);
Properties workerProps = new Properties();
// TODO: Non-avro built-ins?
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerConfig = new WorkerConfig(workerProps);
converter = PowerMock.createMock(Converter.class);
workerTask = PowerMock.createPartialMock(

View File

@ -87,12 +87,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
public void setup() {
super.setup();
Properties workerProps = new Properties();
// TODO: Non-avro built-ins?
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
config = new WorkerConfig(workerProps);
sourceTask = PowerMock.createMock(SourceTask.class);
converter = PowerMock.createMock(Converter.class);

View File

@ -59,15 +59,12 @@ public class WorkerTest extends ThreadedTest {
public void setup() {
super.setup();
// TODO: Remove schema registry URL
// TODO: Non-avro built-ins?
Properties workerProps = new Properties();
workerProps.setProperty("schema.registry.url", "http://localhost:8081");
workerProps.setProperty("converter", "org.apache.kafka.copycat.avro.AvroConverter");
workerProps.setProperty("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
workerProps.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
workerProps.setProperty("converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
WorkerConfig config = new WorkerConfig(workerProps);
worker = new Worker(new MockTime(), config, offsetBackingStore,
offsetKeySerializer, offsetValueSerializer,

View File

@ -17,7 +17,6 @@
package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.errors.CopycatException;

View File

@ -15,4 +15,4 @@
apply from: file('scala.gradle')
include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender',
'copycat-data', 'copycat-api', 'copycat-runtime', 'copycat-avro', 'copycat-file'
'copycat-data', 'copycat-api', 'copycat-runtime', 'copycat-json', 'copycat-file'