mirror of https://github.com/apache/kafka.git
KAFKA-2475: Make Copycat only have a Converter class instead of Serializer, Deserializer, and Converter.
The Converter class now translates directly between byte[] and Copycat's data API instead of requiring an intermediate runtime type like Avro's GenericRecord or Jackson's JsonNode.
This commit is contained in:
parent
2fa8c74853
commit
4bed051e56
|
@ -18,10 +18,6 @@ bootstrap.servers=localhost:9092
|
|||
|
||||
key.converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
value.converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
|
||||
offset.storage.file.filename=/tmp/copycat.offsets
|
||||
# Flush much faster than normal, which is useful for testing/debugging
|
||||
|
|
|
@ -23,20 +23,34 @@ import java.nio.ByteBuffer;
|
|||
import java.util.*;
|
||||
|
||||
public class CopycatSchema implements Schema {
|
||||
private static final Map<Type, Class<?>> SCHEMA_TYPE_CLASSES = new HashMap<>();
|
||||
/**
|
||||
* Maps Schema.Types to a list of Java classes that can be used to represent them.
|
||||
*/
|
||||
private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new HashMap<>();
|
||||
|
||||
/**
|
||||
* Maps the Java classes to the corresponding Schema.Type.
|
||||
*/
|
||||
private static final Map<Class<?>, Type> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>();
|
||||
|
||||
static {
|
||||
SCHEMA_TYPE_CLASSES.put(Type.INT8, Byte.class);
|
||||
SCHEMA_TYPE_CLASSES.put(Type.INT16, Short.class);
|
||||
SCHEMA_TYPE_CLASSES.put(Type.INT32, Integer.class);
|
||||
SCHEMA_TYPE_CLASSES.put(Type.INT64, Long.class);
|
||||
SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Float.class);
|
||||
SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Double.class);
|
||||
SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Boolean.class);
|
||||
SCHEMA_TYPE_CLASSES.put(Type.STRING, String.class);
|
||||
SCHEMA_TYPE_CLASSES.put(Type.ARRAY, List.class);
|
||||
SCHEMA_TYPE_CLASSES.put(Type.MAP, Map.class);
|
||||
SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Struct.class);
|
||||
// Bytes are handled as a special case
|
||||
SCHEMA_TYPE_CLASSES.put(Type.INT8, Arrays.asList((Class) Byte.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.INT16, Arrays.asList((Class) Short.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.INT32, Arrays.asList((Class) Integer.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.INT64, Arrays.asList((Class) Long.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Arrays.asList((Class) Float.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Arrays.asList((Class) Double.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Arrays.asList((Class) Boolean.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.STRING, Arrays.asList((Class) String.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.BYTES, Arrays.asList((Class) byte[].class, (Class) ByteBuffer.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.ARRAY, Arrays.asList((Class) List.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.MAP, Arrays.asList((Class) Map.class));
|
||||
SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Arrays.asList((Class) Struct.class));
|
||||
|
||||
for (Map.Entry<Type, List<Class>> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) {
|
||||
for (Class<?> schemaClass : schemaClasses.getValue())
|
||||
JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
// The type of the field
|
||||
|
@ -161,11 +175,19 @@ public class CopycatSchema implements Schema {
|
|||
// Special case for bytes. byte[] causes problems because it doesn't handle equals()/hashCode() like we want
|
||||
// objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause those methods to fail, so
|
||||
// ByteBuffers are recommended
|
||||
if (schema.type() == Type.BYTES && (value instanceof byte[] || value instanceof ByteBuffer))
|
||||
return;
|
||||
Class<?> expectedClass = SCHEMA_TYPE_CLASSES.get(schema.type());
|
||||
if (expectedClass == null || !expectedClass.isInstance(value))
|
||||
throw new DataException("Invalid value: expected " + expectedClass + " for type " + schema.type() + " but tried to use " + value.getClass());
|
||||
final List<Class> expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
|
||||
if (expectedClasses == null)
|
||||
throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass());
|
||||
|
||||
boolean foundMatch = false;
|
||||
for (Class<?> expectedClass : expectedClasses) {
|
||||
if (expectedClass.isInstance(value)) {
|
||||
foundMatch = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!foundMatch)
|
||||
throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass());
|
||||
|
||||
switch (schema.type()) {
|
||||
case STRUCT:
|
||||
|
@ -232,4 +254,32 @@ public class CopycatSchema implements Schema {
|
|||
else
|
||||
return "Schema{" + type + "}";
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the {@link Type} associated with the the given class.
|
||||
*
|
||||
* @param klass the Class to
|
||||
* @return the corresponding type, nor null if there is no matching type
|
||||
*/
|
||||
public static Type schemaType(Class<?> klass) {
|
||||
synchronized (JAVA_CLASS_SCHEMA_TYPES) {
|
||||
Type schemaType = JAVA_CLASS_SCHEMA_TYPES.get(klass);
|
||||
if (schemaType != null)
|
||||
return schemaType;
|
||||
|
||||
// Since the lookup only checks the class, we need to also try
|
||||
for (Map.Entry<Class<?>, Type> entry : JAVA_CLASS_SCHEMA_TYPES.entrySet()) {
|
||||
try {
|
||||
klass.asSubclass(entry.getKey());
|
||||
// Cache this for subsequent lookups
|
||||
JAVA_CLASS_SCHEMA_TYPES.put(klass, entry.getValue());
|
||||
return entry.getValue();
|
||||
} catch (ClassCastException e) {
|
||||
// Expected, ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,7 +92,6 @@ public interface Schema {
|
|||
Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build();
|
||||
Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build();
|
||||
|
||||
|
||||
/**
|
||||
* @return the type of this schema
|
||||
*/
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.kafka.copycat.errors.DataException;
|
|||
import org.apache.kafka.copycat.errors.SchemaBuilderException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -346,7 +347,7 @@ public class SchemaBuilder implements Schema {
|
|||
* @return the {@link Schema}
|
||||
*/
|
||||
public Schema build() {
|
||||
return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields, keySchema, valueSchema);
|
||||
return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, fields == null ? null : Collections.unmodifiableList(fields), keySchema, valueSchema);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,28 +21,37 @@ import org.apache.kafka.common.annotation.InterfaceStability;
|
|||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.copycat.data.SchemaAndValue;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The Converter interface provides support for translating between Copycat's runtime data format
|
||||
* and the "native" runtime format used by the serialization layer. This is used to translate
|
||||
* two types of data: records and offsets. The (de)serialization is performed by a separate
|
||||
* component -- the producer or consumer serializer or deserializer for records or a Copycat
|
||||
* serializer or deserializer for offsets.
|
||||
* and byte[]. Internally, this likely includes an intermediate step to the format used by the serialization
|
||||
* layer (e.g. JsonNode, GenericRecord, Message).
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public interface Converter<T> {
|
||||
public interface Converter {
|
||||
|
||||
/**
|
||||
* Configure this class.
|
||||
* @param configs configs in key/value pairs
|
||||
* @param isKey whether is for key or value
|
||||
*/
|
||||
void configure(Map<String, ?> configs, boolean isKey);
|
||||
|
||||
/**
|
||||
* Convert a Copycat data object to a native object for serialization.
|
||||
* @param topic the topic associated with the data
|
||||
* @param schema the schema for the value
|
||||
* @param value the value to convert
|
||||
* @return
|
||||
*/
|
||||
T fromCopycatData(Schema schema, Object value);
|
||||
byte[] fromCopycatData(String topic, Schema schema, Object value);
|
||||
|
||||
/**
|
||||
* Convert a native object to a Copycat data object.
|
||||
* @param topic the topic associated with the data
|
||||
* @param value the value to convert
|
||||
* @return an object containing the {@link Schema} and the converted value
|
||||
*/
|
||||
SchemaAndValue toCopycatData(T value);
|
||||
SchemaAndValue toCopycatData(String topic, byte[] value);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
|||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.apache.kafka.common.errors.SerializationException;
|
||||
import org.apache.kafka.copycat.data.*;
|
||||
import org.apache.kafka.copycat.errors.DataException;
|
||||
import org.apache.kafka.copycat.storage.Converter;
|
||||
|
@ -32,7 +33,9 @@ import java.util.*;
|
|||
/**
|
||||
* Implementation of Converter that uses JSON to store schemas and objects.
|
||||
*/
|
||||
public class JsonConverter implements Converter<JsonNode> {
|
||||
public class JsonConverter implements Converter {
|
||||
private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
|
||||
private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
|
||||
|
||||
private static final HashMap<Schema.Type, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS = new HashMap<>();
|
||||
|
||||
|
@ -117,9 +120,7 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
public Object convert(Schema schema, JsonNode value) {
|
||||
if (value.isNull()) return checkOptionalAndDefault(schema);
|
||||
|
||||
Schema elemSchema = schema.valueSchema();
|
||||
if (elemSchema == null)
|
||||
throw new DataException("Array schema did not specify the element type");
|
||||
Schema elemSchema = schema == null ? null : schema.valueSchema();
|
||||
ArrayList<Object> result = new ArrayList<>();
|
||||
for (JsonNode elem : value) {
|
||||
result.add(convertToCopycat(elemSchema, elem));
|
||||
|
@ -132,13 +133,14 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
public Object convert(Schema schema, JsonNode value) {
|
||||
if (value.isNull()) return checkOptionalAndDefault(schema);
|
||||
|
||||
Schema keySchema = schema.keySchema();
|
||||
Schema valueSchema = schema.valueSchema();
|
||||
Schema keySchema = schema == null ? null : schema.keySchema();
|
||||
Schema valueSchema = schema == null ? null : schema.valueSchema();
|
||||
|
||||
// If the map uses strings for keys, it should be encoded in the natural JSON format. If it uses other
|
||||
// primitive types or a complex type as a key, it will be encoded as a list of pairs
|
||||
// primitive types or a complex type as a key, it will be encoded as a list of pairs. If we don't have a
|
||||
// schema, we default to encoding in a Map.
|
||||
Map<Object, Object> result = new HashMap<>();
|
||||
if (keySchema.type() == Schema.Type.STRING) {
|
||||
if (schema == null || keySchema.type() == Schema.Type.STRING) {
|
||||
if (!value.isObject())
|
||||
throw new DataException("Map's with string fields should be encoded as JSON objects, but found " + value.getNodeType());
|
||||
Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields();
|
||||
|
@ -182,24 +184,73 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
}
|
||||
});
|
||||
|
||||
|
||||
}
|
||||
|
||||
private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
|
||||
|
||||
private final JsonSerializer serializer = new JsonSerializer();
|
||||
private final JsonDeserializer deserializer = new JsonDeserializer();
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, boolean isKey) {
|
||||
Object enableConfigsVal = configs.get(SCHEMAS_ENABLE_CONFIG);
|
||||
if (enableConfigsVal != null)
|
||||
enableSchemas = enableConfigsVal.toString().equals("true");
|
||||
|
||||
serializer.configure(configs, isKey);
|
||||
deserializer.configure(configs, isKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonNode fromCopycatData(Schema schema, Object value) {
|
||||
return convertToJsonWithSchemaEnvelope(schema, value);
|
||||
public byte[] fromCopycatData(String topic, Schema schema, Object value) {
|
||||
JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
|
||||
try {
|
||||
return serializer.serialize(topic, jsonValue);
|
||||
} catch (SerializationException e) {
|
||||
throw new DataException("Converting Copycat data to byte[] failed due to serialization error: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchemaAndValue toCopycatData(JsonNode value) {
|
||||
if (!value.isObject() || value.size() != 2 || !value.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !value.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
|
||||
public SchemaAndValue toCopycatData(String topic, byte[] value) {
|
||||
JsonNode jsonValue;
|
||||
try {
|
||||
jsonValue = deserializer.deserialize(topic, value);
|
||||
} catch (SerializationException e) {
|
||||
throw new DataException("Converting byte[] to Copycat data failed due to serialization error: ", e);
|
||||
}
|
||||
|
||||
if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload")))
|
||||
throw new DataException("JsonDeserializer with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields");
|
||||
|
||||
// The deserialized data should either be an envelope object containing the schema and the payload or the schema
|
||||
// was stripped during serialization and we need to fill in an all-encompassing schema.
|
||||
if (!enableSchemas) {
|
||||
ObjectNode envelope = JsonNodeFactory.instance.objectNode();
|
||||
envelope.set("schema", null);
|
||||
envelope.set("payload", jsonValue);
|
||||
jsonValue = envelope;
|
||||
}
|
||||
|
||||
return jsonToCopycat(jsonValue);
|
||||
}
|
||||
|
||||
private SchemaAndValue jsonToCopycat(JsonNode jsonValue) {
|
||||
if (jsonValue == null)
|
||||
return SchemaAndValue.NULL;
|
||||
|
||||
if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
|
||||
throw new DataException("JSON value converted to Copycat must be in envelope containing schema");
|
||||
|
||||
Schema schema = asCopycatSchema(value.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
return new SchemaAndValue(schema, convertToCopycat(schema, value.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
|
||||
Schema schema = asCopycatSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
return new SchemaAndValue(schema, convertToCopycat(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
|
||||
}
|
||||
|
||||
|
||||
private static ObjectNode asJsonSchema(Schema schema) {
|
||||
if (schema == null)
|
||||
return null;
|
||||
|
||||
final ObjectNode jsonSchema;
|
||||
switch (schema.type()) {
|
||||
case BOOLEAN:
|
||||
|
@ -369,16 +420,22 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
* @param value the value
|
||||
* @return JsonNode-encoded version
|
||||
*/
|
||||
private static JsonNode convertToJsonWithSchemaEnvelope(Schema schema, Object value) {
|
||||
private static JsonNode convertToJsonWithEnvelope(Schema schema, Object value) {
|
||||
return new JsonSchema.Envelope(asJsonSchema(schema), convertToJson(schema, value)).toJsonNode();
|
||||
}
|
||||
|
||||
private static JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) {
|
||||
return convertToJson(schema, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert this object, in the org.apache.kafka.copycat.data format, into a JSON object, returning both the schema
|
||||
* and the converted object.
|
||||
*/
|
||||
private static JsonNode convertToJson(Schema schema, Object value) {
|
||||
if (value == null) {
|
||||
if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
|
||||
return null;
|
||||
if (schema.defaultValue() != null)
|
||||
return convertToJson(schema, schema.defaultValue());
|
||||
if (schema.isOptional())
|
||||
|
@ -386,85 +443,141 @@ public class JsonConverter implements Converter<JsonNode> {
|
|||
throw new DataException("Conversion error: null value for field that is required and has no default value");
|
||||
}
|
||||
|
||||
switch (schema.type()) {
|
||||
case INT8:
|
||||
return JsonNodeFactory.instance.numberNode((Byte) value);
|
||||
case INT16:
|
||||
return JsonNodeFactory.instance.numberNode((Short) value);
|
||||
case INT32:
|
||||
return JsonNodeFactory.instance.numberNode((Integer) value);
|
||||
case INT64:
|
||||
return JsonNodeFactory.instance.numberNode((Long) value);
|
||||
case FLOAT32:
|
||||
return JsonNodeFactory.instance.numberNode((Float) value);
|
||||
case FLOAT64:
|
||||
return JsonNodeFactory.instance.numberNode((Double) value);
|
||||
case BOOLEAN:
|
||||
return JsonNodeFactory.instance.booleanNode((Boolean) value);
|
||||
case STRING:
|
||||
CharSequence charSeq = (CharSequence) value;
|
||||
return JsonNodeFactory.instance.textNode(charSeq.toString());
|
||||
case BYTES:
|
||||
if (value instanceof byte[])
|
||||
return JsonNodeFactory.instance.binaryNode((byte[]) value);
|
||||
else if (value instanceof ByteBuffer)
|
||||
return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
|
||||
else
|
||||
throw new DataException("Invalid type for bytes type: " + value.getClass());
|
||||
case ARRAY: {
|
||||
if (!(value instanceof Collection))
|
||||
throw new DataException("Invalid type for array type: " + value.getClass());
|
||||
Collection collection = (Collection) value;
|
||||
ArrayNode list = JsonNodeFactory.instance.arrayNode();
|
||||
for (Object elem : collection) {
|
||||
JsonNode fieldValue = convertToJson(schema.valueSchema(), elem);
|
||||
list.add(fieldValue);
|
||||
}
|
||||
return list;
|
||||
try {
|
||||
final Schema.Type schemaType;
|
||||
if (schema == null) {
|
||||
schemaType = CopycatSchema.schemaType(value.getClass());
|
||||
if (schemaType == null)
|
||||
throw new DataException("Java class " + value.getClass() + " does not have corresponding schema type.");
|
||||
} else {
|
||||
schemaType = schema.type();
|
||||
}
|
||||
case MAP: {
|
||||
if (!(value instanceof Map))
|
||||
throw new DataException("Invalid type for array type: " + value.getClass());
|
||||
Map<?, ?> map = (Map<?, ?>) value;
|
||||
// If true, using string keys and JSON object; if false, using non-string keys and Array-encoding
|
||||
boolean objectMode = schema.keySchema().type() == Schema.Type.STRING;
|
||||
ObjectNode obj = null;
|
||||
ArrayNode list = null;
|
||||
if (objectMode)
|
||||
obj = JsonNodeFactory.instance.objectNode();
|
||||
else
|
||||
list = JsonNodeFactory.instance.arrayNode();
|
||||
for (Map.Entry<?, ?> entry : map.entrySet()) {
|
||||
JsonNode mapKey = convertToJson(schema.keySchema(), entry.getKey());
|
||||
JsonNode mapValue = convertToJson(schema.valueSchema(), entry.getValue());
|
||||
|
||||
if (objectMode)
|
||||
obj.set(mapKey.asText(), mapValue);
|
||||
switch (schemaType) {
|
||||
case INT8:
|
||||
return JsonNodeFactory.instance.numberNode((Byte) value);
|
||||
case INT16:
|
||||
return JsonNodeFactory.instance.numberNode((Short) value);
|
||||
case INT32:
|
||||
return JsonNodeFactory.instance.numberNode((Integer) value);
|
||||
case INT64:
|
||||
return JsonNodeFactory.instance.numberNode((Long) value);
|
||||
case FLOAT32:
|
||||
return JsonNodeFactory.instance.numberNode((Float) value);
|
||||
case FLOAT64:
|
||||
return JsonNodeFactory.instance.numberNode((Double) value);
|
||||
case BOOLEAN:
|
||||
return JsonNodeFactory.instance.booleanNode((Boolean) value);
|
||||
case STRING:
|
||||
CharSequence charSeq = (CharSequence) value;
|
||||
return JsonNodeFactory.instance.textNode(charSeq.toString());
|
||||
case BYTES:
|
||||
if (value instanceof byte[])
|
||||
return JsonNodeFactory.instance.binaryNode((byte[]) value);
|
||||
else if (value instanceof ByteBuffer)
|
||||
return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
|
||||
else
|
||||
list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
|
||||
throw new DataException("Invalid type for bytes type: " + value.getClass());
|
||||
case ARRAY: {
|
||||
Collection collection = (Collection) value;
|
||||
ArrayNode list = JsonNodeFactory.instance.arrayNode();
|
||||
for (Object elem : collection) {
|
||||
Schema valueSchema = schema == null ? null : schema.valueSchema();
|
||||
JsonNode fieldValue = convertToJson(valueSchema, elem);
|
||||
list.add(fieldValue);
|
||||
}
|
||||
return list;
|
||||
}
|
||||
return objectMode ? obj : list;
|
||||
}
|
||||
case STRUCT: {
|
||||
if (!(value instanceof Struct))
|
||||
throw new DataException("Invalid type for struct type: " + value.getClass());
|
||||
Struct struct = (Struct) value;
|
||||
if (struct.schema() != schema)
|
||||
throw new DataException("Mismatching schema.");
|
||||
ObjectNode obj = JsonNodeFactory.instance.objectNode();
|
||||
for (Field field : schema.fields()) {
|
||||
obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
|
||||
}
|
||||
return obj;
|
||||
}
|
||||
}
|
||||
case MAP: {
|
||||
Map<?, ?> map = (Map<?, ?>) value;
|
||||
// If true, using string keys and JSON object; if false, using non-string keys and Array-encoding
|
||||
boolean objectMode;
|
||||
if (schema == null) {
|
||||
objectMode = true;
|
||||
for (Map.Entry<?, ?> entry : map.entrySet()) {
|
||||
if (!(entry.getKey() instanceof String)) {
|
||||
objectMode = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
objectMode = schema.keySchema().type() == Schema.Type.STRING;
|
||||
}
|
||||
ObjectNode obj = null;
|
||||
ArrayNode list = null;
|
||||
if (objectMode)
|
||||
obj = JsonNodeFactory.instance.objectNode();
|
||||
else
|
||||
list = JsonNodeFactory.instance.arrayNode();
|
||||
for (Map.Entry<?, ?> entry : map.entrySet()) {
|
||||
Schema keySchema = schema == null ? null : schema.keySchema();
|
||||
Schema valueSchema = schema == null ? null : schema.valueSchema();
|
||||
JsonNode mapKey = convertToJson(keySchema, entry.getKey());
|
||||
JsonNode mapValue = convertToJson(valueSchema, entry.getValue());
|
||||
|
||||
throw new DataException("Couldn't convert " + value + " to JSON.");
|
||||
if (objectMode)
|
||||
obj.set(mapKey.asText(), mapValue);
|
||||
else
|
||||
list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
|
||||
}
|
||||
return objectMode ? obj : list;
|
||||
}
|
||||
case STRUCT: {
|
||||
Struct struct = (Struct) value;
|
||||
if (struct.schema() != schema)
|
||||
throw new DataException("Mismatching schema.");
|
||||
ObjectNode obj = JsonNodeFactory.instance.objectNode();
|
||||
for (Field field : schema.fields()) {
|
||||
obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
|
||||
}
|
||||
return obj;
|
||||
}
|
||||
}
|
||||
|
||||
throw new DataException("Couldn't convert " + value + " to JSON.");
|
||||
} catch (ClassCastException e) {
|
||||
throw new DataException("Invalid type for " + schema.type() + ": " + value.getClass());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static Object convertToCopycat(Schema schema, JsonNode jsonValue) {
|
||||
JsonToCopycatTypeConverter typeConverter = TO_COPYCAT_CONVERTERS.get(schema.type());
|
||||
JsonToCopycatTypeConverter typeConverter;
|
||||
final Schema.Type schemaType;
|
||||
if (schema != null) {
|
||||
schemaType = schema.type();
|
||||
} else {
|
||||
switch (jsonValue.getNodeType()) {
|
||||
case NULL:
|
||||
// Special case. With no schema
|
||||
return null;
|
||||
case BOOLEAN:
|
||||
schemaType = Schema.Type.BOOLEAN;
|
||||
break;
|
||||
case NUMBER:
|
||||
if (jsonValue.isIntegralNumber())
|
||||
schemaType = Schema.Type.INT64;
|
||||
else
|
||||
schemaType = Schema.Type.FLOAT64;
|
||||
break;
|
||||
case ARRAY:
|
||||
schemaType = Schema.Type.ARRAY;
|
||||
break;
|
||||
case OBJECT:
|
||||
schemaType = Schema.Type.MAP;
|
||||
break;
|
||||
case STRING:
|
||||
schemaType = Schema.Type.STRING;
|
||||
break;
|
||||
|
||||
case BINARY:
|
||||
case MISSING:
|
||||
case POJO:
|
||||
default:
|
||||
schemaType = null;
|
||||
break;
|
||||
}
|
||||
}
|
||||
typeConverter = TO_COPYCAT_CONVERTERS.get(schemaType);
|
||||
if (typeConverter == null)
|
||||
throw new DataException("Unknown schema type: " + schema.type());
|
||||
|
||||
|
|
|
@ -18,9 +18,6 @@ package org.apache.kafka.copycat.json;
|
|||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import org.apache.kafka.common.errors.SerializationException;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
|
||||
|
@ -31,22 +28,6 @@ import java.util.Map;
|
|||
* structured data without having associated Java classes. This deserializer also supports Copycat schemas.
|
||||
*/
|
||||
public class JsonDeserializer implements Deserializer<JsonNode> {
|
||||
private static final ObjectNode CATCH_ALL_OBJECT_SCHEMA = JsonNodeFactory.instance.objectNode();
|
||||
private static final ObjectNode CATCH_ALL_ARRAY_SCHEMA = JsonNodeFactory.instance.objectNode();
|
||||
private static final ArrayNode ALL_SCHEMAS_LIST = JsonNodeFactory.instance.arrayNode();
|
||||
private static final ObjectNode CATCH_ALL_SCHEMA = JsonNodeFactory.instance.objectNode();
|
||||
static {
|
||||
CATCH_ALL_OBJECT_SCHEMA.put("type", "object")
|
||||
.putArray("field").add(JsonNodeFactory.instance.objectNode().put("*", "all"));
|
||||
|
||||
CATCH_ALL_ARRAY_SCHEMA.put("type", "array").put("items", "all");
|
||||
|
||||
ALL_SCHEMAS_LIST.add("boolean").add("int").add("long").add("float").add("double").add("bytes").add("string")
|
||||
.add(CATCH_ALL_ARRAY_SCHEMA).add(CATCH_ALL_OBJECT_SCHEMA);
|
||||
|
||||
CATCH_ALL_SCHEMA.put("name", "all").set("type", ALL_SCHEMAS_LIST);
|
||||
}
|
||||
|
||||
private ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
/**
|
||||
|
@ -61,6 +42,9 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
|
|||
|
||||
@Override
|
||||
public JsonNode deserialize(String topic, byte[] bytes) {
|
||||
if (bytes == null)
|
||||
return null;
|
||||
|
||||
JsonNode data;
|
||||
try {
|
||||
data = objectMapper.readTree(bytes);
|
||||
|
@ -68,15 +52,6 @@ public class JsonDeserializer implements Deserializer<JsonNode> {
|
|||
throw new SerializationException(e);
|
||||
}
|
||||
|
||||
// The deserialized data should either be an envelope object containing the schema and the payload or the schema
|
||||
// was stripped during serialization and we need to fill in an all-encompassing schema.
|
||||
if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload")) {
|
||||
ObjectNode envelope = JsonNodeFactory.instance.objectNode();
|
||||
envelope.set("schema", CATCH_ALL_SCHEMA);
|
||||
envelope.set("payload", data);
|
||||
data = envelope;
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,12 +28,7 @@ import java.util.Map;
|
|||
* structured data without corresponding Java classes. This serializer also supports Copycat schemas.
|
||||
*/
|
||||
public class JsonSerializer implements Serializer<JsonNode> {
|
||||
|
||||
private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
|
||||
private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
|
||||
|
||||
/**
|
||||
* Default constructor needed by Kafka
|
||||
|
@ -44,9 +39,6 @@ public class JsonSerializer implements Serializer<JsonNode> {
|
|||
|
||||
@Override
|
||||
public void configure(Map<String, ?> config, boolean isKey) {
|
||||
Object enableConfigsVal = config.get(SCHEMAS_ENABLE_CONFIG);
|
||||
if (enableConfigsVal != null)
|
||||
enableSchemas = enableConfigsVal.toString().equals("true");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -54,14 +46,7 @@ public class JsonSerializer implements Serializer<JsonNode> {
|
|||
if (data == null)
|
||||
return null;
|
||||
|
||||
// This serializer works for Copycat data that requires a schema to be included, so we expect it to have a
|
||||
// specific format: { "schema": {...}, "payload": ... }.
|
||||
if (!data.isObject() || data.size() != 2 || !data.has("schema") || !data.has("payload"))
|
||||
throw new SerializationException("JsonSerializer requires \"schema\" and \"payload\" fields and may not contain additional fields");
|
||||
|
||||
try {
|
||||
if (!enableSchemas)
|
||||
data = data.get("payload");
|
||||
return objectMapper.writeValueAsBytes(data);
|
||||
} catch (Exception e) {
|
||||
throw new SerializationException("Error serializing JSON message", e);
|
||||
|
|
|
@ -19,19 +19,19 @@ package org.apache.kafka.copycat.json;
|
|||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.copycat.data.SchemaAndValue;
|
||||
import org.apache.kafka.copycat.data.SchemaBuilder;
|
||||
import org.apache.kafka.copycat.data.Struct;
|
||||
import org.apache.kafka.copycat.errors.DataException;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
@ -39,6 +39,7 @@ import static org.junit.Assert.assertTrue;
|
|||
import static org.junit.Assert.fail;
|
||||
|
||||
public class JsonConverterTest {
|
||||
private static final String TOPIC = "topic";
|
||||
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
JsonConverter converter = new JsonConverter();
|
||||
|
@ -48,51 +49,51 @@ public class JsonConverterTest {
|
|||
@Test
|
||||
public void testCopycatSchemaMetadataTranslation() {
|
||||
// this validates the non-type fields are translated and handled properly
|
||||
assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
|
||||
assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }")));
|
||||
assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()));
|
||||
assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }".getBytes()));
|
||||
assertEquals(new SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true),
|
||||
converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }")));
|
||||
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }".getBytes()));
|
||||
assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").build(), true),
|
||||
converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\"}, \"payload\": true }")));
|
||||
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\"}, \"payload\": true }".getBytes()));
|
||||
}
|
||||
|
||||
// Schema types
|
||||
|
||||
@Test
|
||||
public void booleanToCopycat() {
|
||||
assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }")));
|
||||
assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }")));
|
||||
assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()));
|
||||
assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }".getBytes()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void byteToCopycat() {
|
||||
assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }")));
|
||||
assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }".getBytes()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shortToCopycat() {
|
||||
assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }")));
|
||||
assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }".getBytes()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void intToCopycat() {
|
||||
assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }")));
|
||||
assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }".getBytes()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void longToCopycat() {
|
||||
assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }")));
|
||||
assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }")));
|
||||
assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }".getBytes()));
|
||||
assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }".getBytes()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void floatToCopycat() {
|
||||
assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }")));
|
||||
assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }".getBytes()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void doubleToCopycat() {
|
||||
assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }")));
|
||||
assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }".getBytes()));
|
||||
}
|
||||
|
||||
|
||||
|
@ -100,69 +101,105 @@ public class JsonConverterTest {
|
|||
public void bytesToCopycat() throws UnsupportedEncodingException {
|
||||
ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8"));
|
||||
String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }";
|
||||
SchemaAndValue schemaAndValue = converter.toCopycatData(parse(msg));
|
||||
SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes());
|
||||
ByteBuffer converted = ByteBuffer.wrap((byte[]) schemaAndValue.value());
|
||||
assertEquals(reference, converted);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void stringToCopycat() {
|
||||
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toCopycatData(parse("{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }")));
|
||||
assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void arrayToCopycat() {
|
||||
JsonNode arrayJson = parse("{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }");
|
||||
assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toCopycatData(arrayJson));
|
||||
byte[] arrayJson = "{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }".getBytes();
|
||||
assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toCopycatData(TOPIC, arrayJson));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mapToCopycatStringKeys() {
|
||||
JsonNode mapJson = parse("{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }");
|
||||
byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }".getBytes();
|
||||
Map<String, Integer> expected = new HashMap<>();
|
||||
expected.put("key1", 12);
|
||||
expected.put("key2", 15);
|
||||
assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(mapJson));
|
||||
assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, mapJson));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mapToCopycatNonStringKeys() {
|
||||
JsonNode mapJson = parse("{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }");
|
||||
byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }".getBytes();
|
||||
Map<Integer, Integer> expected = new HashMap<>();
|
||||
expected.put(1, 12);
|
||||
expected.put(2, 15);
|
||||
assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(mapJson));
|
||||
assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, mapJson));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void structToCopycat() {
|
||||
JsonNode structJson = parse("{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }");
|
||||
byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }".getBytes();
|
||||
Schema expectedSchema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
|
||||
Struct expected = new Struct(expectedSchema).put("field1", true).put("field2", "string");
|
||||
SchemaAndValue converted = converter.toCopycatData(structJson);
|
||||
SchemaAndValue converted = converter.toCopycatData(TOPIC, structJson);
|
||||
assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
|
||||
}
|
||||
|
||||
@Test(expected = DataException.class)
|
||||
public void nullToCopycat() {
|
||||
// When schemas are enabled, trying to decode a null should be an error -- we should *always* have the envelope
|
||||
assertEquals(SchemaAndValue.NULL, converter.toCopycatData(TOPIC, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nullSchemaPrimitiveToCopycat() {
|
||||
SchemaAndValue converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes());
|
||||
assertEquals(SchemaAndValue.NULL, converted);
|
||||
|
||||
converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": true }".getBytes());
|
||||
assertEquals(new SchemaAndValue(null, true), converted);
|
||||
|
||||
// Integers: Copycat has more data types, and JSON unfortunately mixes all number types. We try to preserve
|
||||
// info as best we can, so we always use the largest integer and floating point numbers we can and have Jackson
|
||||
// determine if it's an integer or not
|
||||
converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": 12 }".getBytes());
|
||||
assertEquals(new SchemaAndValue(null, 12L), converted);
|
||||
|
||||
converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": 12.24 }".getBytes());
|
||||
assertEquals(new SchemaAndValue(null, 12.24), converted);
|
||||
|
||||
converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": \"a string\" }".getBytes());
|
||||
assertEquals(new SchemaAndValue(null, "a string"), converted);
|
||||
|
||||
converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": [1, \"2\", 3] }".getBytes());
|
||||
assertEquals(new SchemaAndValue(null, Arrays.asList(1L, "2", 3L)), converted);
|
||||
|
||||
converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": { \"field1\": 1, \"field2\": 2} }".getBytes());
|
||||
Map<String, Long> obj = new HashMap<>();
|
||||
obj.put("field1", 1L);
|
||||
obj.put("field2", 2L);
|
||||
assertEquals(new SchemaAndValue(null, obj), converted);
|
||||
}
|
||||
|
||||
// Schema metadata
|
||||
|
||||
@Test
|
||||
public void testJsonSchemaMetadataTranslation() {
|
||||
JsonNode converted = converter.fromCopycatData(Schema.BOOLEAN_SCHEMA, true);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
|
||||
|
||||
converted = converter.fromCopycatData(Schema.OPTIONAL_BOOLEAN_SCHEMA, null);
|
||||
converted = parse(converter.fromCopycatData(TOPIC, Schema.OPTIONAL_BOOLEAN_SCHEMA, null));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"boolean\", \"optional\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNull());
|
||||
|
||||
converted = converter.fromCopycatData(SchemaBuilder.bool().defaultValue(true).build(), true);
|
||||
converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().defaultValue(true).build(), true));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"default\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
|
||||
|
||||
converted = converter.fromCopycatData(SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").build(), true);
|
||||
converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").build(), true));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 3, \"doc\": \"the documentation\"}"),
|
||||
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
|
@ -173,7 +210,7 @@ public class JsonConverterTest {
|
|||
|
||||
@Test
|
||||
public void booleanToJson() {
|
||||
JsonNode converted = converter.fromCopycatData(Schema.BOOLEAN_SCHEMA, true);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
|
||||
|
@ -181,7 +218,7 @@ public class JsonConverterTest {
|
|||
|
||||
@Test
|
||||
public void byteToJson() {
|
||||
JsonNode converted = converter.fromCopycatData(Schema.INT8_SCHEMA, (byte) 12);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT8_SCHEMA, (byte) 12));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"int8\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
|
||||
|
@ -189,7 +226,7 @@ public class JsonConverterTest {
|
|||
|
||||
@Test
|
||||
public void shortToJson() {
|
||||
JsonNode converted = converter.fromCopycatData(Schema.INT16_SCHEMA, (short) 12);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT16_SCHEMA, (short) 12));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"int16\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
|
||||
|
@ -197,7 +234,7 @@ public class JsonConverterTest {
|
|||
|
||||
@Test
|
||||
public void intToJson() {
|
||||
JsonNode converted = converter.fromCopycatData(Schema.INT32_SCHEMA, 12);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT32_SCHEMA, 12));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"int32\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
|
||||
|
@ -205,7 +242,7 @@ public class JsonConverterTest {
|
|||
|
||||
@Test
|
||||
public void longToJson() {
|
||||
JsonNode converted = converter.fromCopycatData(Schema.INT64_SCHEMA, 4398046511104L);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT64_SCHEMA, 4398046511104L));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"int64\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(4398046511104L, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue());
|
||||
|
@ -213,7 +250,7 @@ public class JsonConverterTest {
|
|||
|
||||
@Test
|
||||
public void floatToJson() {
|
||||
JsonNode converted = converter.fromCopycatData(Schema.FLOAT32_SCHEMA, 12.34f);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.FLOAT32_SCHEMA, 12.34f));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"float\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(12.34f, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001);
|
||||
|
@ -221,7 +258,7 @@ public class JsonConverterTest {
|
|||
|
||||
@Test
|
||||
public void doubleToJson() {
|
||||
JsonNode converted = converter.fromCopycatData(Schema.FLOAT64_SCHEMA, 12.34);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, 12.34));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"double\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(12.34, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001);
|
||||
|
@ -229,7 +266,7 @@ public class JsonConverterTest {
|
|||
|
||||
@Test
|
||||
public void bytesToJson() throws IOException {
|
||||
JsonNode converted = converter.fromCopycatData(Schema.BYTES_SCHEMA, "test-string".getBytes());
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BYTES_SCHEMA, "test-string".getBytes()));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(ByteBuffer.wrap("test-string".getBytes()),
|
||||
|
@ -238,7 +275,7 @@ public class JsonConverterTest {
|
|||
|
||||
@Test
|
||||
public void stringToJson() {
|
||||
JsonNode converted = converter.fromCopycatData(Schema.STRING_SCHEMA, "test-string");
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, "test-string"));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"string\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue());
|
||||
|
@ -247,7 +284,7 @@ public class JsonConverterTest {
|
|||
@Test
|
||||
public void arrayToJson() {
|
||||
Schema int32Array = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
|
||||
JsonNode converted = converter.fromCopycatData(int32Array, Arrays.asList(1, 2, 3));
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, int32Array, Arrays.asList(1, 2, 3)));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int32\", \"optional\": false }, \"optional\": false }"),
|
||||
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
|
@ -261,7 +298,7 @@ public class JsonConverterTest {
|
|||
Map<String, Integer> input = new HashMap<>();
|
||||
input.put("key1", 12);
|
||||
input.put("key2", 15);
|
||||
JsonNode converted = converter.fromCopycatData(stringIntMap, input);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, stringIntMap, input));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"string\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"),
|
||||
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
|
@ -275,21 +312,28 @@ public class JsonConverterTest {
|
|||
Map<Integer, Integer> input = new HashMap<>();
|
||||
input.put(1, 12);
|
||||
input.put(2, 15);
|
||||
JsonNode converted = converter.fromCopycatData(intIntMap, input);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, intIntMap, input));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"int32\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"),
|
||||
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertEquals(JsonNodeFactory.instance.arrayNode()
|
||||
.add(JsonNodeFactory.instance.arrayNode().add(1).add(12))
|
||||
.add(JsonNodeFactory.instance.arrayNode().add(2).add(15)),
|
||||
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
|
||||
assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray());
|
||||
ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
|
||||
assertEquals(2, payload.size());
|
||||
Set<JsonNode> payloadEntries = new HashSet<>();
|
||||
for (JsonNode elem : payload)
|
||||
payloadEntries.add(elem);
|
||||
assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add(1).add(12),
|
||||
JsonNodeFactory.instance.arrayNode().add(2).add(15))),
|
||||
payloadEntries
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void structToJson() {
|
||||
Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
|
||||
Struct input = new Struct(schema).put("field1", true).put("field2", "string");
|
||||
JsonNode converted = converter.fromCopycatData(schema, input);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, schema, input));
|
||||
validateEnvelope(converted);
|
||||
assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }] }"),
|
||||
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
|
@ -299,6 +343,102 @@ public class JsonConverterTest {
|
|||
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void nullSchemaAndPrimitiveToJson() {
|
||||
// This still needs to do conversion of data, null schema means "anything goes"
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true));
|
||||
validateEnvelopeNullSchema(converted);
|
||||
assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
|
||||
assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nullSchemaAndArrayToJson() {
|
||||
// This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
|
||||
// types to verify conversion still works.
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, Arrays.asList(1, "string", true)));
|
||||
validateEnvelopeNullSchema(converted);
|
||||
assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
|
||||
assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add("string").add(true),
|
||||
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nullSchemaAndMapToJson() {
|
||||
// This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
|
||||
// types to verify conversion still works.
|
||||
Map<String, Object> input = new HashMap<>();
|
||||
input.put("key1", 12);
|
||||
input.put("key2", "string");
|
||||
input.put("key3", true);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, input));
|
||||
validateEnvelopeNullSchema(converted);
|
||||
assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
|
||||
assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 12).put("key2", "string").put("key3", true),
|
||||
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nullSchemaAndMapNonStringKeysToJson() {
|
||||
// This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match
|
||||
// types to verify conversion still works.
|
||||
Map<Object, Object> input = new HashMap<>();
|
||||
input.put("string", 12);
|
||||
input.put(52, "string");
|
||||
input.put(false, true);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, input));
|
||||
validateEnvelopeNullSchema(converted);
|
||||
assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
|
||||
assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray());
|
||||
ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
|
||||
assertEquals(3, payload.size());
|
||||
Set<JsonNode> payloadEntries = new HashSet<>();
|
||||
for (JsonNode elem : payload)
|
||||
payloadEntries.add(elem);
|
||||
assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add("string").add(12),
|
||||
JsonNodeFactory.instance.arrayNode().add(52).add("string"),
|
||||
JsonNodeFactory.instance.arrayNode().add(false).add(true))),
|
||||
payloadEntries
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@Test(expected = DataException.class)
|
||||
public void mismatchSchemaJson() {
|
||||
// If we have mismatching schema info, we should properly convert to a DataException
|
||||
converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, true);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void noSchemaToCopycat() {
|
||||
Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
|
||||
converter.configure(props, true);
|
||||
assertEquals(new SchemaAndValue(null, true), converter.toCopycatData(TOPIC, "true".getBytes()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noSchemaToJson() {
|
||||
Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false);
|
||||
converter.configure(props, true);
|
||||
JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true));
|
||||
assertTrue(converted.isBoolean());
|
||||
assertEquals(true, converted.booleanValue());
|
||||
}
|
||||
|
||||
|
||||
|
||||
private JsonNode parse(byte[] json) {
|
||||
try {
|
||||
return objectMapper.readTree(json);
|
||||
} catch (IOException e) {
|
||||
fail("IOException during JSON parse: " + e.getMessage());
|
||||
throw new RuntimeException("failed");
|
||||
}
|
||||
}
|
||||
|
||||
private JsonNode parse(String json) {
|
||||
try {
|
||||
return objectMapper.readTree(json);
|
||||
|
@ -316,4 +456,13 @@ public class JsonConverterTest {
|
|||
assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isObject());
|
||||
assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
}
|
||||
|
||||
private void validateEnvelopeNullSchema(JsonNode env) {
|
||||
assertNotNull(env);
|
||||
assertTrue(env.isObject());
|
||||
assertEquals(2, env.size());
|
||||
assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
|
||||
assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
|
||||
assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,22 +58,6 @@ public class WorkerConfig extends AbstractConfig {
|
|||
public static final String VALUE_CONVERTER_CLASS_DOC =
|
||||
"Converter class for value Copycat data that implements the <code>Converter</code> interface.";
|
||||
|
||||
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
|
||||
public static final String KEY_SERIALIZER_CLASS_DOC =
|
||||
"Serializer class for key that implements the <code>Serializer</code> interface.";
|
||||
|
||||
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
|
||||
public static final String VALUE_SERIALIZER_CLASS_DOC =
|
||||
"Serializer class for value that implements the <code>Serializer</code> interface.";
|
||||
|
||||
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
|
||||
public static final String KEY_DESERIALIZER_CLASS_DOC =
|
||||
"Serializer class for key that implements the <code>Deserializer</code> interface.";
|
||||
|
||||
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
|
||||
public static final String VALUE_DESERIALIZER_CLASS_DOC =
|
||||
"Deserializer class for value that implements the <code>Deserializer</code> interface.";
|
||||
|
||||
public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
|
||||
= "task.shutdown.graceful.timeout.ms";
|
||||
private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
|
||||
|
@ -104,14 +88,6 @@ public class WorkerConfig extends AbstractConfig {
|
|||
Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
|
||||
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
|
||||
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, KEY_SERIALIZER_CLASS_DOC)
|
||||
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, VALUE_SERIALIZER_CLASS_DOC)
|
||||
.define(KEY_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, KEY_DESERIALIZER_CLASS_DOC)
|
||||
.define(VALUE_DESERIALIZER_CLASS_CONFIG, Type.CLASS,
|
||||
Importance.HIGH, VALUE_DESERIALIZER_CLASS_DOC)
|
||||
.define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
|
||||
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
|
||||
TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.runtime;
|
||||
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.common.utils.SystemTime;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
|
@ -47,34 +45,30 @@ import java.util.Properties;
|
|||
* Since each task has a dedicated thread, this is mainly just a container for them.
|
||||
* </p>
|
||||
*/
|
||||
public class Worker<K, V> {
|
||||
public class Worker {
|
||||
private static final Logger log = LoggerFactory.getLogger(Worker.class);
|
||||
|
||||
private Time time;
|
||||
private WorkerConfig config;
|
||||
private Converter<K> keyConverter;
|
||||
private Converter<V> valueConverter;
|
||||
private Converter keyConverter;
|
||||
private Converter valueConverter;
|
||||
private OffsetBackingStore offsetBackingStore;
|
||||
private Serializer<K> offsetKeySerializer;
|
||||
private Serializer<V> offsetValueSerializer;
|
||||
private Deserializer<K> offsetKeyDeserializer;
|
||||
private Deserializer<V> offsetValueDeserializer;
|
||||
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
|
||||
private KafkaProducer<K, V> producer;
|
||||
private KafkaProducer<byte[], byte[]> producer;
|
||||
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
|
||||
|
||||
public Worker(WorkerConfig config) {
|
||||
this(new SystemTime(), config, null, null, null, null, null);
|
||||
this(new SystemTime(), config, null);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
|
||||
Serializer offsetKeySerializer, Serializer offsetValueSerializer,
|
||||
Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
|
||||
public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
|
||||
this.time = time;
|
||||
this.config = config;
|
||||
this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
this.keyConverter.configure(config.originals(), true);
|
||||
this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
|
||||
this.valueConverter.configure(config.originals(), false);
|
||||
|
||||
if (offsetBackingStore != null) {
|
||||
this.offsetBackingStore = offsetBackingStore;
|
||||
|
@ -82,34 +76,6 @@ public class Worker<K, V> {
|
|||
this.offsetBackingStore = new FileOffsetBackingStore();
|
||||
this.offsetBackingStore.configure(config.originals());
|
||||
}
|
||||
|
||||
if (offsetKeySerializer != null) {
|
||||
this.offsetKeySerializer = offsetKeySerializer;
|
||||
} else {
|
||||
this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
|
||||
this.offsetKeySerializer.configure(config.originals(), true);
|
||||
}
|
||||
|
||||
if (offsetValueSerializer != null) {
|
||||
this.offsetValueSerializer = offsetValueSerializer;
|
||||
} else {
|
||||
this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
|
||||
this.offsetValueSerializer.configure(config.originals(), false);
|
||||
}
|
||||
|
||||
if (offsetKeyDeserializer != null) {
|
||||
this.offsetKeyDeserializer = offsetKeyDeserializer;
|
||||
} else {
|
||||
this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
||||
this.offsetKeyDeserializer.configure(config.originals(), true);
|
||||
}
|
||||
|
||||
if (offsetValueDeserializer != null) {
|
||||
this.offsetValueDeserializer = offsetValueDeserializer;
|
||||
} else {
|
||||
this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
|
||||
this.offsetValueDeserializer.configure(config.originals(), false);
|
||||
}
|
||||
}
|
||||
|
||||
public void start() {
|
||||
|
@ -119,8 +85,8 @@ public class Worker<K, V> {
|
|||
|
||||
Map<String, Object> producerProps = new HashMap<>();
|
||||
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName());
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName());
|
||||
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
||||
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
|
||||
for (String propName : unusedConfigs.stringPropertyNames()) {
|
||||
producerProps.put(propName, unusedConfigs.getProperty(propName));
|
||||
}
|
||||
|
@ -188,14 +154,14 @@ public class Worker<K, V> {
|
|||
final WorkerTask workerTask;
|
||||
if (task instanceof SourceTask) {
|
||||
SourceTask sourceTask = (SourceTask) task;
|
||||
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.connector(),
|
||||
keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
|
||||
OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.connector(),
|
||||
keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
|
||||
workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer,
|
||||
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
|
||||
keyConverter, valueConverter);
|
||||
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
|
||||
keyConverter, valueConverter);
|
||||
workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
|
||||
offsetReader, offsetWriter, config, time);
|
||||
} else if (task instanceof SinkTask) {
|
||||
workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, keyConverter, valueConverter, time);
|
||||
workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
|
||||
} else {
|
||||
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
|
||||
throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");
|
||||
|
|
|
@ -38,21 +38,21 @@ import java.util.concurrent.TimeUnit;
|
|||
/**
|
||||
* WorkerTask that uses a SinkTask to export data from Kafka.
|
||||
*/
|
||||
class WorkerSinkTask<K, V> implements WorkerTask {
|
||||
class WorkerSinkTask implements WorkerTask {
|
||||
private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
|
||||
|
||||
private final ConnectorTaskId id;
|
||||
private final SinkTask task;
|
||||
private final WorkerConfig workerConfig;
|
||||
private final Time time;
|
||||
private final Converter<K> keyConverter;
|
||||
private final Converter<V> valueConverter;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private WorkerSinkTaskThread workThread;
|
||||
private KafkaConsumer<K, V> consumer;
|
||||
private KafkaConsumer<byte[], byte[]> consumer;
|
||||
private final SinkTaskContext context;
|
||||
|
||||
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
|
||||
Converter<K> keyConverter, Converter<V> valueConverter, Time time) {
|
||||
Converter keyConverter, Converter valueConverter, Time time) {
|
||||
this.id = id;
|
||||
this.task = task;
|
||||
this.workerConfig = workerConfig;
|
||||
|
@ -107,7 +107,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
|
|||
public void poll(long timeoutMs) {
|
||||
try {
|
||||
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
|
||||
ConsumerRecords<K, V> msgs = consumer.poll(timeoutMs);
|
||||
ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
|
||||
log.trace("{} polling returned {} messages", id, msgs.count());
|
||||
deliverMessages(msgs);
|
||||
} catch (ConsumerWakeupException we) {
|
||||
|
@ -154,7 +154,7 @@ class WorkerSinkTask<K, V> implements WorkerTask {
|
|||
return workerConfig;
|
||||
}
|
||||
|
||||
private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
|
||||
private KafkaConsumer<byte[], byte[]> createConsumer(Properties taskProps) {
|
||||
String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
|
||||
if (topicsStr == null || topicsStr.isEmpty())
|
||||
throw new CopycatException("Sink tasks require a list of topics.");
|
||||
|
@ -168,12 +168,10 @@ class WorkerSinkTask<K, V> implements WorkerTask {
|
|||
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
|
||||
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
||||
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
|
||||
workerConfig.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName());
|
||||
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
|
||||
workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName());
|
||||
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
|
||||
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
|
||||
|
||||
KafkaConsumer<K, V> newConsumer;
|
||||
KafkaConsumer<byte[], byte[]> newConsumer;
|
||||
try {
|
||||
newConsumer = new KafkaConsumer<>(props);
|
||||
} catch (Throwable t) {
|
||||
|
@ -202,14 +200,14 @@ class WorkerSinkTask<K, V> implements WorkerTask {
|
|||
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
|
||||
}
|
||||
|
||||
private void deliverMessages(ConsumerRecords<K, V> msgs) {
|
||||
private void deliverMessages(ConsumerRecords<byte[], byte[]> msgs) {
|
||||
// Finally, deliver this batch to the sink
|
||||
if (msgs.count() > 0) {
|
||||
List<SinkRecord> records = new ArrayList<>();
|
||||
for (ConsumerRecord<K, V> msg : msgs) {
|
||||
for (ConsumerRecord<byte[], byte[]> msg : msgs) {
|
||||
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
|
||||
SchemaAndValue keyAndSchema = msg.key() != null ? keyConverter.toCopycatData(msg.key()) : SchemaAndValue.NULL;
|
||||
SchemaAndValue valueAndSchema = msg.value() != null ? valueConverter.toCopycatData(msg.value()) : SchemaAndValue.NULL;
|
||||
SchemaAndValue keyAndSchema = keyConverter.toCopycatData(msg.topic(), msg.key());
|
||||
SchemaAndValue valueAndSchema = valueConverter.toCopycatData(msg.topic(), msg.value());
|
||||
records.add(
|
||||
new SinkRecord(msg.topic(), msg.partition(),
|
||||
keyAndSchema.schema(), keyAndSchema.value(),
|
||||
|
|
|
@ -46,33 +46,31 @@ import java.util.concurrent.TimeoutException;
|
|||
/**
|
||||
* WorkerTask that uses a SourceTask to ingest data into Kafka.
|
||||
*/
|
||||
class WorkerSourceTask<K, V> implements WorkerTask {
|
||||
class WorkerSourceTask implements WorkerTask {
|
||||
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
|
||||
|
||||
private ConnectorTaskId id;
|
||||
private SourceTask task;
|
||||
private final Converter<K> keyConverter;
|
||||
private final Converter<V> valueConverter;
|
||||
private KafkaProducer<K, V> producer;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private KafkaProducer<byte[], byte[]> producer;
|
||||
private WorkerSourceTaskThread workThread;
|
||||
private OffsetStorageReader offsetReader;
|
||||
private OffsetStorageWriter<K, V> offsetWriter;
|
||||
private OffsetStorageWriter offsetWriter;
|
||||
private final WorkerConfig workerConfig;
|
||||
private final Time time;
|
||||
|
||||
// Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
|
||||
// there is no IdentityHashSet.
|
||||
private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
|
||||
outstandingMessages;
|
||||
private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
|
||||
// A second buffer is used while an offset flush is running
|
||||
private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
|
||||
outstandingMessagesBacklog;
|
||||
private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
|
||||
private boolean flushing;
|
||||
|
||||
public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
|
||||
Converter<K> keyConverter, Converter<V> valueConverter,
|
||||
KafkaProducer<K, V> producer,
|
||||
OffsetStorageReader offsetReader, OffsetStorageWriter<K, V> offsetWriter,
|
||||
Converter keyConverter, Converter valueConverter,
|
||||
KafkaProducer<byte[], byte[]> producer,
|
||||
OffsetStorageReader offsetReader, OffsetStorageWriter offsetWriter,
|
||||
WorkerConfig workerConfig, Time time) {
|
||||
this.id = id;
|
||||
this.task = task;
|
||||
|
@ -132,10 +130,9 @@ class WorkerSourceTask<K, V> implements WorkerTask {
|
|||
*/
|
||||
private synchronized void sendRecords(List<SourceRecord> records) {
|
||||
for (SourceRecord record : records) {
|
||||
K key = (record.keySchema() != null) ? keyConverter.fromCopycatData(record.keySchema(), record.key()) : null;
|
||||
V value = (record.valueSchema() != null) ? valueConverter.fromCopycatData(record.valueSchema(), record.value()) : null;
|
||||
final ProducerRecord<K, V> producerRecord = new ProducerRecord<>(
|
||||
record.topic(), record.kafkaPartition(), key, value);
|
||||
byte[] key = keyConverter.fromCopycatData(record.topic(), record.keySchema(), record.key());
|
||||
byte[] value = valueConverter.fromCopycatData(record.topic(), record.valueSchema(), record.value());
|
||||
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);
|
||||
log.trace("Appending record with key {}, value {}", record.key(), record.value());
|
||||
if (!flushing) {
|
||||
outstandingMessages.put(producerRecord, producerRecord);
|
||||
|
@ -163,8 +160,8 @@ class WorkerSourceTask<K, V> implements WorkerTask {
|
|||
}
|
||||
}
|
||||
|
||||
private synchronized void recordSent(final ProducerRecord<K, V> record) {
|
||||
ProducerRecord<K, V> removed = outstandingMessages.remove(record);
|
||||
private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record) {
|
||||
ProducerRecord<byte[], byte[]> removed = outstandingMessages.remove(record);
|
||||
// While flushing, we may also see callbacks for items in the backlog
|
||||
if (removed == null && flushing)
|
||||
removed = outstandingMessagesBacklog.remove(record);
|
||||
|
@ -276,7 +273,7 @@ class WorkerSourceTask<K, V> implements WorkerTask {
|
|||
|
||||
private void finishSuccessfulFlush() {
|
||||
// If we were successful, we can just swap instead of replacing items back into the original map
|
||||
IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> temp = outstandingMessages;
|
||||
IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> temp = outstandingMessages;
|
||||
outstandingMessages = outstandingMessagesBacklog;
|
||||
outstandingMessagesBacklog = temp;
|
||||
flushing = false;
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.copycat.data.SchemaAndValue;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -35,25 +33,20 @@ import java.util.Map;
|
|||
* directly, the interface is only separate from this implementation because it needs to be
|
||||
* included in the public API package.
|
||||
*/
|
||||
public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
|
||||
public class OffsetStorageReaderImpl implements OffsetStorageReader {
|
||||
private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
|
||||
|
||||
private final OffsetBackingStore backingStore;
|
||||
private final String namespace;
|
||||
private final Converter<K> keyConverter;
|
||||
private final Converter<V> valueConverter;
|
||||
private final Serializer<K> keySerializer;
|
||||
private final Deserializer<V> valueDeserializer;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
|
||||
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
|
||||
Converter<K> keyConverter, Converter<V> valueConverter,
|
||||
Serializer<K> keySerializer, Deserializer<V> valueDeserializer) {
|
||||
Converter keyConverter, Converter valueConverter) {
|
||||
this.backingStore = backingStore;
|
||||
this.namespace = namespace;
|
||||
this.keyConverter = keyConverter;
|
||||
this.valueConverter = valueConverter;
|
||||
this.keySerializer = keySerializer;
|
||||
this.valueDeserializer = valueDeserializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -67,7 +60,7 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
|
|||
Map<ByteBuffer, SchemaAndValue> serializedToOriginal = new HashMap<>(partitions.size());
|
||||
for (SchemaAndValue key : partitions) {
|
||||
try {
|
||||
byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key.schema(), key.value()));
|
||||
byte[] keySerialized = keyConverter.fromCopycatData(namespace, key.schema(), key.value());
|
||||
ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
|
||||
serializedToOriginal.put(keyBuffer, key);
|
||||
} catch (Throwable t) {
|
||||
|
@ -97,9 +90,7 @@ public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
|
|||
continue;
|
||||
}
|
||||
SchemaAndValue origKey = serializedToOriginal.get(rawEntry.getKey());
|
||||
SchemaAndValue deserializedValue = valueConverter.toCopycatData(
|
||||
valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
|
||||
);
|
||||
SchemaAndValue deserializedValue = valueConverter.toCopycatData(namespace, rawEntry.getValue().array());
|
||||
|
||||
result.put(origKey, deserializedValue);
|
||||
} catch (Throwable t) {
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.copycat.data.SchemaAndValue;
|
||||
import org.apache.kafka.copycat.errors.CopycatException;
|
||||
import org.apache.kafka.copycat.util.Callback;
|
||||
|
@ -64,14 +63,12 @@ import java.util.concurrent.Future;
|
|||
* This class is not thread-safe. It should only be accessed from a Task's processing thread.
|
||||
* </p>
|
||||
*/
|
||||
public class OffsetStorageWriter<K, V> {
|
||||
public class OffsetStorageWriter {
|
||||
private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
|
||||
|
||||
private final OffsetBackingStore backingStore;
|
||||
private final Converter<K> keyConverter;
|
||||
private final Converter<V> valueConverter;
|
||||
private final Serializer<K> keySerializer;
|
||||
private final Serializer<V> valueSerializer;
|
||||
private final Converter keyConverter;
|
||||
private final Converter valueConverter;
|
||||
private final String namespace;
|
||||
// Offset data in Copycat format
|
||||
private Map<SchemaAndValue, SchemaAndValue> data = new HashMap<>();
|
||||
|
@ -82,14 +79,11 @@ public class OffsetStorageWriter<K, V> {
|
|||
private long currentFlushId = 0;
|
||||
|
||||
public OffsetStorageWriter(OffsetBackingStore backingStore,
|
||||
String namespace, Converter<K> keyConverter, Converter<V> valueConverter,
|
||||
Serializer<K> keySerializer, Serializer<V> valueSerializer) {
|
||||
String namespace, Converter keyConverter, Converter valueConverter) {
|
||||
this.backingStore = backingStore;
|
||||
this.namespace = namespace;
|
||||
this.keyConverter = keyConverter;
|
||||
this.valueConverter = valueConverter;
|
||||
this.keySerializer = keySerializer;
|
||||
this.valueSerializer = valueSerializer;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -143,9 +137,9 @@ public class OffsetStorageWriter<K, V> {
|
|||
try {
|
||||
offsetsSerialized = new HashMap<>();
|
||||
for (Map.Entry<SchemaAndValue, SchemaAndValue> entry : toFlush.entrySet()) {
|
||||
byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey().schema(), entry.getKey().value()));
|
||||
byte[] key = keyConverter.fromCopycatData(namespace, entry.getKey().schema(), entry.getKey().value());
|
||||
ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
|
||||
byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue().schema(), entry.getValue().value()));
|
||||
byte[] value = valueConverter.fromCopycatData(namespace, entry.getValue().schema(), entry.getValue().value());
|
||||
ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
|
||||
offsetsSerialized.put(keyBuffer, valueBuffer);
|
||||
}
|
||||
|
|
|
@ -67,10 +67,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
private Time time;
|
||||
@Mock private SinkTask sinkTask;
|
||||
private WorkerConfig workerConfig;
|
||||
@Mock private Converter<byte[]> keyConverter;
|
||||
@Mock private Converter keyConverter;
|
||||
@Mock
|
||||
private Converter<byte[]> valueConverter;
|
||||
private WorkerSinkTask<Integer, String> workerTask;
|
||||
private Converter valueConverter;
|
||||
private WorkerSinkTask workerTask;
|
||||
@Mock private KafkaConsumer<byte[], byte[]> consumer;
|
||||
private WorkerSinkTaskThread workerThread;
|
||||
|
||||
|
@ -84,10 +84,6 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
Properties workerProps = new Properties();
|
||||
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
|
||||
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
|
||||
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
workerConfig = new WorkerConfig(workerProps);
|
||||
workerTask = PowerMock.createPartialMock(
|
||||
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
|
||||
|
@ -138,12 +134,12 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
|
||||
ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
|
||||
Collections.singletonMap(
|
||||
new TopicPartition("topic", 0),
|
||||
Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE))));
|
||||
new TopicPartition(TOPIC, 0),
|
||||
Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, RAW_KEY, RAW_VALUE))));
|
||||
|
||||
// Exact data doesn't matter, but should be passed directly to sink task
|
||||
EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record);
|
||||
EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record);
|
||||
EasyMock.expect(keyConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_KEY))).andReturn(record);
|
||||
EasyMock.expect(valueConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_VALUE))).andReturn(record);
|
||||
Capture<Collection<SinkRecord>> capturedRecords
|
||||
= EasyMock.newCapture(CaptureType.ALL);
|
||||
sinkTask.put(EasyMock.capture(capturedRecords));
|
||||
|
@ -320,8 +316,8 @@ public class WorkerSinkTaskTest extends ThreadedTest {
|
|||
return records;
|
||||
}
|
||||
});
|
||||
EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
|
||||
EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
|
||||
EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
|
||||
EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
|
||||
Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
|
||||
sinkTask.put(EasyMock.capture(capturedRecords));
|
||||
EasyMock.expectLastCall().anyTimes();
|
||||
|
|
|
@ -45,7 +45,6 @@ import org.powermock.api.easymock.annotation.Mock;
|
|||
import org.powermock.modules.junit4.PowerMockRunner;
|
||||
import org.powermock.reflect.Whitebox;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
@ -59,6 +58,7 @@ import static org.junit.Assert.*;
|
|||
|
||||
@RunWith(PowerMockRunner.class)
|
||||
public class WorkerSourceTaskTest extends ThreadedTest {
|
||||
private static final String TOPIC = "topic";
|
||||
private static final Schema PARTITION_SCHEMA = Schema.BYTES_SCHEMA;
|
||||
private static final byte[] PARTITION_BYTES = "partition".getBytes();
|
||||
private static final Schema OFFSET_SCHEMA = Schema.BYTES_SCHEMA;
|
||||
|
@ -69,20 +69,20 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
private static final Integer KEY = -1;
|
||||
private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
|
||||
private static final Long RECORD = 12L;
|
||||
// Native-formatted data. The actual format of this data doesn't matter -- we just want to see that the right version
|
||||
// Serialized data. The actual format of this data doesn't matter -- we just want to see that the right version
|
||||
// is used in the right place.
|
||||
private static final ByteBuffer CONVERTED_KEY = ByteBuffer.wrap("converted-key".getBytes());
|
||||
private static final String CONVERTED_RECORD = "converted-record";
|
||||
private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
|
||||
private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
|
||||
|
||||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||
private WorkerConfig config;
|
||||
@Mock private SourceTask sourceTask;
|
||||
@Mock private Converter<ByteBuffer> keyConverter;
|
||||
@Mock private Converter<String> valueConverter;
|
||||
@Mock private KafkaProducer<ByteBuffer, String> producer;
|
||||
@Mock private Converter keyConverter;
|
||||
@Mock private Converter valueConverter;
|
||||
@Mock private KafkaProducer<byte[], byte[]> producer;
|
||||
@Mock private OffsetStorageReader offsetReader;
|
||||
@Mock private OffsetStorageWriter<ByteBuffer, String> offsetWriter;
|
||||
private WorkerSourceTask<ByteBuffer, String> workerTask;
|
||||
@Mock private OffsetStorageWriter offsetWriter;
|
||||
private WorkerSourceTask workerTask;
|
||||
@Mock private Future<RecordMetadata> sendFuture;
|
||||
|
||||
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
|
||||
|
@ -107,7 +107,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
}
|
||||
|
||||
private void createWorkerTask() {
|
||||
workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, valueConverter, producer,
|
||||
workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
|
||||
offsetReader, offsetWriter, config, new SystemTime());
|
||||
}
|
||||
|
||||
|
@ -203,13 +203,13 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
// Can just use the same record for key and value
|
||||
records.add(new SourceRecord(PARTITION_SCHEMA, PARTITION_BYTES, OFFSET_SCHEMA, OFFSET_BYTES, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
|
||||
|
||||
Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord();
|
||||
Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord();
|
||||
|
||||
PowerMock.replayAll();
|
||||
|
||||
Whitebox.invokeMethod(workerTask, "sendRecords", records);
|
||||
assertEquals(CONVERTED_KEY, sent.getValue().key());
|
||||
assertEquals(CONVERTED_RECORD, sent.getValue().value());
|
||||
assertEquals(SERIALIZED_KEY, sent.getValue().key());
|
||||
assertEquals(SERIALIZED_RECORD, sent.getValue().value());
|
||||
|
||||
PowerMock.verifyAll();
|
||||
}
|
||||
|
@ -233,11 +233,11 @@ public class WorkerSourceTaskTest extends ThreadedTest {
|
|||
return latch;
|
||||
}
|
||||
|
||||
private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() throws InterruptedException {
|
||||
EasyMock.expect(keyConverter.fromCopycatData(KEY_SCHEMA, KEY)).andStubReturn(CONVERTED_KEY);
|
||||
EasyMock.expect(valueConverter.fromCopycatData(RECORD_SCHEMA, RECORD)).andStubReturn(CONVERTED_RECORD);
|
||||
private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() throws InterruptedException {
|
||||
EasyMock.expect(keyConverter.fromCopycatData(TOPIC, KEY_SCHEMA, KEY)).andStubReturn(SERIALIZED_KEY);
|
||||
EasyMock.expect(valueConverter.fromCopycatData(TOPIC, RECORD_SCHEMA, RECORD)).andStubReturn(SERIALIZED_RECORD);
|
||||
|
||||
Capture<ProducerRecord<ByteBuffer, String>> sent = EasyMock.newCapture();
|
||||
Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
|
||||
// 1. Converted data passed to the producer, which will need callbacks invoked for flush to work
|
||||
EasyMock.expect(
|
||||
producer.send(EasyMock.capture(sent),
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.runtime;
|
||||
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.common.utils.Time;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.copycat.cli.WorkerConfig;
|
||||
|
@ -49,10 +47,6 @@ public class WorkerTest extends ThreadedTest {
|
|||
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
|
||||
private Worker worker;
|
||||
private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
|
||||
private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class);
|
||||
private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class);
|
||||
private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class);
|
||||
private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class);
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
|
@ -61,14 +55,8 @@ public class WorkerTest extends ThreadedTest {
|
|||
Properties workerProps = new Properties();
|
||||
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
|
||||
workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
|
||||
workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
|
||||
workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
|
||||
WorkerConfig config = new WorkerConfig(workerProps);
|
||||
worker = new Worker(new MockTime(), config, offsetBackingStore,
|
||||
offsetKeySerializer, offsetValueSerializer,
|
||||
offsetKeyDeserializer, offsetValueDeserializer);
|
||||
worker = new Worker(new MockTime(), config, offsetBackingStore);
|
||||
worker.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package org.apache.kafka.copycat.storage;
|
||||
|
||||
import org.apache.kafka.common.serialization.Serializer;
|
||||
import org.apache.kafka.copycat.data.Schema;
|
||||
import org.apache.kafka.copycat.data.SchemaAndValue;
|
||||
import org.apache.kafka.copycat.data.SchemaBuilder;
|
||||
|
@ -52,9 +51,6 @@ public class OffsetStorageWriterTest {
|
|||
private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
|
||||
private static final Schema OFFSET_VALUE_SCHEMA = Schema.STRING_SCHEMA;
|
||||
private static final String OFFSET_VALUE = "value";
|
||||
// Native objects - must match serializer types
|
||||
private static final int OFFSET_KEY_CONVERTED = 12;
|
||||
private static final String OFFSET_VALUE_CONVERTED = "value-converted";
|
||||
// Serialized
|
||||
private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
|
||||
private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
|
||||
|
@ -63,11 +59,9 @@ public class OffsetStorageWriterTest {
|
|||
ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
|
||||
|
||||
@Mock private OffsetBackingStore store;
|
||||
@Mock private Converter<Integer> keyConverter;
|
||||
@Mock private Converter<String> valueConverter;
|
||||
@Mock private Serializer<Integer> keySerializer;
|
||||
@Mock private Serializer<String> valueSerializer;
|
||||
private OffsetStorageWriter<Integer, String> writer;
|
||||
@Mock private Converter keyConverter;
|
||||
@Mock private Converter valueConverter;
|
||||
private OffsetStorageWriter writer;
|
||||
|
||||
private static Exception exception = new RuntimeException("error");
|
||||
|
||||
|
@ -75,7 +69,7 @@ public class OffsetStorageWriterTest {
|
|||
|
||||
@Before
|
||||
public void setup() {
|
||||
writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
|
||||
writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter);
|
||||
service = Executors.newFixedThreadPool(1);
|
||||
}
|
||||
|
||||
|
@ -207,10 +201,8 @@ public class OffsetStorageWriterTest {
|
|||
private void expectStore(final Callback<Void> callback,
|
||||
final boolean fail,
|
||||
final CountDownLatch waitForCompletion) {
|
||||
EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY_SCHEMA, OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
|
||||
EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
|
||||
EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE_SCHEMA, OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
|
||||
EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
|
||||
EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, OFFSET_KEY_SCHEMA, OFFSET_KEY)).andReturn(OFFSET_KEY_SERIALIZED);
|
||||
EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, OFFSET_VALUE_SCHEMA, OFFSET_VALUE)).andReturn(OFFSET_VALUE_SERIALIZED);
|
||||
|
||||
final Capture<Callback<Void>> storeCallback = Capture.newInstance();
|
||||
EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),
|
||||
|
|
|
@ -17,9 +17,5 @@ bootstrap.servers={{ kafka.bootstrap_servers() }}
|
|||
|
||||
key.converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
value.converter=org.apache.kafka.copycat.json.JsonConverter
|
||||
key.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
value.serializer=org.apache.kafka.copycat.json.JsonSerializer
|
||||
key.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
value.deserializer=org.apache.kafka.copycat.json.JsonDeserializer
|
||||
|
||||
offset.storage.file.filename={{ OFFSETS_FILE }}
|
||||
|
|
Loading…
Reference in New Issue