KAFKA-6684: Support casting Connect values with bytes schema to string

Allow to cast LogicalType to string by calling the serialized (Java) object's toString().

Added tests for `BigDecimal` and `Date` as whole record and as fields.

Author: Amit Sela <amitsela33@gmail.com>

Reviewers: Randall Hauch <rhauch@gmail.com>, Robert Yokota <rayokota@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #4820 from amitsela/cast-transform-bytes
This commit is contained in:
Amit Sela 2018-09-30 22:24:09 -07:00 committed by Randall Hauch
parent c2af356724
commit aaa71d7e01
3 changed files with 97 additions and 23 deletions

View File

@ -707,7 +707,7 @@ public class Values {
return value.replaceAll("\\\\", "\\\\\\\\").replaceAll("\"", "\\\\\"");
}
protected static DateFormat dateFormatFor(java.util.Date value) {
public static DateFormat dateFormatFor(java.util.Date value) {
if (value.getTime() < MILLIS_PER_DAY) {
return new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN);
}

View File

@ -28,6 +28,7 @@ import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@ -78,9 +79,16 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
private static final String PURPOSE = "cast types";
private static final Set<Schema.Type> SUPPORTED_CAST_TYPES = EnumSet.of(
private static final Set<Schema.Type> SUPPORTED_CAST_INPUT_TYPES = EnumSet.of(
Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32, Schema.Type.INT64,
Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN, Schema.Type.STRING
Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN,
Schema.Type.STRING, Schema.Type.BYTES
);
private static final Set<Schema.Type> SUPPORTED_CAST_OUTPUT_TYPES = EnumSet.of(
Schema.Type.INT8, Schema.Type.INT16, Schema.Type.INT32, Schema.Type.INT64,
Schema.Type.FLOAT32, Schema.Type.FLOAT64, Schema.Type.BOOLEAN,
Schema.Type.STRING
);
// As a special case for casting the entire value (e.g. the incoming key is a int64 but you know it could be an
@ -120,14 +128,14 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
private R applySchemaless(R record) {
if (wholeValueCastType != null) {
return newRecord(record, null, castValueToType(operatingValue(record), wholeValueCastType));
return newRecord(record, null, castValueToType(null, operatingValue(record), wholeValueCastType));
}
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
final HashMap<String, Object> updatedValue = new HashMap<>(value);
for (Map.Entry<String, Schema.Type> fieldSpec : casts.entrySet()) {
String field = fieldSpec.getKey();
updatedValue.put(field, castValueToType(value.get(field), fieldSpec.getValue()));
updatedValue.put(field, castValueToType(null, value.get(field), fieldSpec.getValue()));
}
return newRecord(record, null, updatedValue);
}
@ -138,7 +146,7 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
// Whole-record casting
if (wholeValueCastType != null)
return newRecord(record, updatedSchema, castValueToType(operatingValue(record), wholeValueCastType));
return newRecord(record, updatedSchema, castValueToType(valueSchema, operatingValue(record), wholeValueCastType));
// Casting within a struct
final Struct value = requireStruct(operatingValue(record), PURPOSE);
@ -147,7 +155,8 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
for (Field field : value.schema().fields()) {
final Object origFieldValue = value.get(field);
final Schema.Type targetType = casts.get(field.name());
final Object newFieldValue = targetType != null ? castValueToType(origFieldValue, targetType) : origFieldValue;
final Object newFieldValue = targetType != null ? castValueToType(field.schema(), origFieldValue, targetType) : origFieldValue;
log.trace("Cast field '{}' from '{}' to '{}'", field.name(), origFieldValue, newFieldValue);
updatedValue.put(updatedSchema.field(field.name()), newFieldValue);
}
return newRecord(record, updatedSchema, updatedValue);
@ -168,8 +177,10 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
SchemaBuilder fieldBuilder = convertFieldType(casts.get(field.name()));
if (field.schema().isOptional())
fieldBuilder.optional();
if (field.schema().defaultValue() != null)
fieldBuilder.defaultValue(castValueToType(field.schema().defaultValue(), fieldBuilder.type()));
if (field.schema().defaultValue() != null) {
Schema fieldSchema = field.schema();
fieldBuilder.defaultValue(castValueToType(fieldSchema, fieldSchema.defaultValue(), fieldBuilder.type()));
}
builder.field(field.name(), fieldBuilder.build());
} else {
builder.field(field.name(), field.schema());
@ -181,7 +192,7 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
if (valueSchema.isOptional())
builder.optional();
if (valueSchema.defaultValue() != null)
builder.defaultValue(castValueToType(valueSchema.defaultValue(), builder.type()));
builder.defaultValue(castValueToType(valueSchema, valueSchema.defaultValue(), builder.type()));
updatedSchema = builder.build();
schemaUpdateCache.put(valueSchema, updatedSchema);
@ -212,11 +223,12 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
}
private static Object castValueToType(Object value, Schema.Type targetType) {
private static Object castValueToType(Schema schema, Object value, Schema.Type targetType) {
try {
if (value == null) return null;
Schema.Type inferredType = ConnectSchema.schemaType(value.getClass());
Schema.Type inferredType = schema == null ? ConnectSchema.schemaType(value.getClass()) :
schema.type();
if (inferredType == null) {
throw new DataException("Cast transformation was passed a value of type " + value.getClass()
+ " which is not supported by Connect's data API");
@ -327,7 +339,12 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
}
private static String castToString(Object value) {
return value.toString();
if (value instanceof java.util.Date) {
java.util.Date dateValue = (java.util.Date) value;
return Values.dateFormatFor(dateValue).format(dateValue);
} else {
return value.toString();
}
}
protected abstract Schema operatingSchema(R record);
@ -370,15 +387,19 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
}
private static Schema.Type validCastType(Schema.Type type, FieldType fieldType) {
if (!SUPPORTED_CAST_TYPES.contains(type)) {
String message = "Cast transformation does not support casting to/from " + type
+ "; supported types are " + SUPPORTED_CAST_TYPES;
switch (fieldType) {
case INPUT:
throw new DataException(message);
case OUTPUT:
throw new ConfigException(message);
}
switch (fieldType) {
case INPUT:
if (!SUPPORTED_CAST_INPUT_TYPES.contains(type)) {
throw new DataException("Cast transformation does not support casting from " +
type + "; supported types are " + SUPPORTED_CAST_INPUT_TYPES);
}
break;
case OUTPUT:
if (!SUPPORTED_CAST_OUTPUT_TYPES.contains(type)) {
throw new ConfigException("Cast transformation does not support casting to " +
type + "; supported types are " + SUPPORTED_CAST_OUTPUT_TYPES);
}
break;
}
return type;
}

View File

@ -18,15 +18,18 @@
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Test;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -39,6 +42,7 @@ import static org.junit.Assert.assertTrue;
public class CastTest {
private final Cast<SourceRecord> xformKey = new Cast.Key<>();
private final Cast<SourceRecord> xformValue = new Cast.Value<>();
private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
@After
public void teardown() {
@ -61,6 +65,11 @@ public class CastTest {
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array"));
}
@Test(expected = ConfigException.class)
public void testUnsupportedTargetType() {
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:bytes"));
}
@Test(expected = ConfigException.class)
public void testConfigInvalidMap() {
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra"));
@ -171,6 +180,28 @@ public class CastTest {
assertEquals("42", transformed.value());
}
@Test
public void castWholeBigDecimalRecordValueWithSchemaString() {
BigDecimal bigDecimal = new BigDecimal(42);
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Decimal.schema(bigDecimal.scale()), bigDecimal));
assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
assertEquals("42", transformed.value());
}
@Test
public void castWholeDateRecordValueWithSchemaString() {
Date timestamp = new Date(MILLIS_PER_DAY + 1); // day + 1msec to get a timestamp formatting.
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
Timestamp.SCHEMA, timestamp));
assertEquals(Schema.Type.STRING, transformed.valueSchema().type());
assertEquals(Values.dateFormatFor(timestamp).format(timestamp), transformed.value());
}
@Test
public void castWholeRecordDefaultValue() {
// Validate default value in schema is correctly converted
@ -292,7 +323,8 @@ public class CastTest {
@Test
public void castFieldsWithSchema() {
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,optional:int32"));
Date day = new Date(MILLIS_PER_DAY);
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,bigdecimal:string,date:string,optional:int32"));
// Include an optional fields and fields with defaults to validate their values are passed through properly
SchemaBuilder builder = SchemaBuilder.struct();
@ -305,6 +337,8 @@ public class CastTest {
builder.field("float64", SchemaBuilder.float64().defaultValue(-1.125).build());
builder.field("boolean", Schema.BOOLEAN_SCHEMA);
builder.field("string", Schema.STRING_SCHEMA);
builder.field("bigdecimal", Decimal.schema(new BigDecimal(42).scale()));
builder.field("date", Timestamp.SCHEMA);
builder.field("optional", Schema.OPTIONAL_FLOAT32_SCHEMA);
builder.field("timestamp", Timestamp.SCHEMA);
Schema supportedTypesSchema = builder.build();
@ -317,6 +351,8 @@ public class CastTest {
recordValue.put("float32", 32.f);
recordValue.put("float64", -64.);
recordValue.put("boolean", true);
recordValue.put("bigdecimal", new BigDecimal(42));
recordValue.put("date", day);
recordValue.put("string", "42");
recordValue.put("timestamp", new Date(0));
// optional field intentionally omitted
@ -335,8 +371,25 @@ public class CastTest {
assertEquals(true, ((Struct) transformed.value()).schema().field("float64").schema().defaultValue());
assertEquals((byte) 1, ((Struct) transformed.value()).get("boolean"));
assertEquals(42, ((Struct) transformed.value()).get("string"));
assertEquals("42", ((Struct) transformed.value()).get("bigdecimal"));
assertEquals(Values.dateFormatFor(day).format(day), ((Struct) transformed.value()).get("date"));
assertEquals(new Date(0), ((Struct) transformed.value()).get("timestamp"));
assertNull(((Struct) transformed.value()).get("optional"));
Schema transformedSchema = ((Struct) transformed.value()).schema();
assertEquals(Schema.INT16_SCHEMA.type(), transformedSchema.field("int8").schema().type());
assertEquals(Schema.OPTIONAL_INT32_SCHEMA.type(), transformedSchema.field("int16").schema().type());
assertEquals(Schema.INT64_SCHEMA.type(), transformedSchema.field("int32").schema().type());
assertEquals(Schema.BOOLEAN_SCHEMA.type(), transformedSchema.field("int64").schema().type());
assertEquals(Schema.FLOAT64_SCHEMA.type(), transformedSchema.field("float32").schema().type());
assertEquals(Schema.BOOLEAN_SCHEMA.type(), transformedSchema.field("float64").schema().type());
assertEquals(Schema.INT8_SCHEMA.type(), transformedSchema.field("boolean").schema().type());
assertEquals(Schema.INT32_SCHEMA.type(), transformedSchema.field("string").schema().type());
assertEquals(Schema.STRING_SCHEMA.type(), transformedSchema.field("bigdecimal").schema().type());
assertEquals(Schema.STRING_SCHEMA.type(), transformedSchema.field("date").schema().type());
assertEquals(Schema.OPTIONAL_INT32_SCHEMA.type(), transformedSchema.field("optional").schema().type());
// The following fields are not changed
assertEquals(Timestamp.SCHEMA.type(), transformedSchema.field("timestamp").schema().type());
}
@SuppressWarnings("unchecked")