KAFKA-7157: Fix handling of nulls in TimestampConverter (#7070)

Fix handling of nulls in TimestampConverter.

Authors: Valeria Vasylieva <valeria.vasylieva@gmail.com>, Robert Yokota <rayokota@gmail.com>
Reviewers: Arjun Satish <arjun@confluent.io>, Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Robert Yokota 2019-07-12 10:12:20 -07:00 committed by Randall Hauch
parent 8c93b7ecd6
commit c05ed1eae4
2 changed files with 243 additions and 42 deletions

View File

@ -47,7 +47,7 @@ import java.util.Set;
import java.util.TimeZone;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
public abstract class TimestampConverter<R extends ConnectRecord<R>> implements Transformation<R> {
@ -85,6 +85,10 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
public static final Schema OPTIONAL_DATE_SCHEMA = org.apache.kafka.connect.data.Date.builder().optional().schema();
public static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().schema();
public static final Schema OPTIONAL_TIME_SCHEMA = Time.builder().optional().schema();
private interface TimestampTranslator {
/**
* Convert from the type-specific format to the universal java.util.Date format
@ -94,7 +98,7 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
/**
* Get the schema for this format.
*/
Schema typeSchema();
Schema typeSchema(boolean isOptional);
/**
* Convert from the universal java.util.Date format to the type-specific format
@ -118,8 +122,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
}
@Override
public Schema typeSchema() {
return Schema.STRING_SCHEMA;
public Schema typeSchema(boolean isOptional) {
return isOptional ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
}
@Override
@ -139,8 +143,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
}
@Override
public Schema typeSchema() {
return Schema.INT64_SCHEMA;
public Schema typeSchema(boolean isOptional) {
return isOptional ? Schema.OPTIONAL_INT64_SCHEMA : Schema.INT64_SCHEMA;
}
@Override
@ -159,8 +163,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
}
@Override
public Schema typeSchema() {
return org.apache.kafka.connect.data.Date.SCHEMA;
public Schema typeSchema(boolean isOptional) {
return isOptional ? OPTIONAL_DATE_SCHEMA : org.apache.kafka.connect.data.Date.SCHEMA;
}
@Override
@ -185,8 +189,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
}
@Override
public Schema typeSchema() {
return Time.SCHEMA;
public Schema typeSchema(boolean isOptional) {
return isOptional ? OPTIONAL_TIME_SCHEMA : Time.SCHEMA;
}
@Override
@ -212,8 +216,8 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
}
@Override
public Schema typeSchema() {
return Timestamp.SCHEMA;
public Schema typeSchema(boolean isOptional) {
return isOptional ? OPTIONAL_TIMESTAMP_SCHEMA : Timestamp.SCHEMA;
}
@Override
@ -330,16 +334,16 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
if (config.field.isEmpty()) {
Object value = operatingValue(record);
// New schema is determined by the requested target timestamp type
Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema();
Schema updatedSchema = TRANSLATORS.get(config.type).typeSchema(schema.isOptional());
return newRecord(record, updatedSchema, convertTimestamp(value, timestampTypeFromSchema(schema)));
} else {
final Struct value = requireStruct(operatingValue(record), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(value.schema());
final Struct value = requireStructOrNull(operatingValue(record), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (Field field : schema.fields()) {
if (field.name().equals(config.field)) {
builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema());
builder.field(field.name(), TRANSLATORS.get(config.type).typeSchema(field.schema().isOptional()));
} else {
builder.field(field.name(), field.schema());
}
@ -361,6 +365,9 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
}
private Struct applyValueWithSchema(Struct value, Schema updatedSchema) {
if (value == null) {
return null;
}
Struct updatedValue = new Struct(updatedSchema);
for (Field field : value.schema().fields()) {
final Object updatedFieldValue;
@ -375,11 +382,11 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
}
private R applySchemaless(R record) {
if (config.field.isEmpty()) {
Object value = operatingValue(record);
return newRecord(record, null, convertTimestamp(value));
Object rawValue = operatingValue(record);
if (rawValue == null || config.field.isEmpty()) {
return newRecord(record, null, convertTimestamp(rawValue));
} else {
final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
final Map<String, Object> value = requireMap(rawValue, PURPOSE);
final HashMap<String, Object> updatedValue = new HashMap<>(value);
updatedValue.put(config.field, convertTimestamp(value.get(config.field)));
return newRecord(record, null, updatedValue);
@ -424,11 +431,14 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
/**
* Convert the given timestamp to the target timestamp format.
* @param timestamp the input timestamp
* @param timestamp the input timestamp, may be null
* @param timestampFormat the format of the timestamp, or null if the format should be inferred
* @return the converted timestamp
*/
private Object convertTimestamp(Object timestamp, String timestampFormat) {
if (timestamp == null) {
return null;
}
if (timestampFormat == null) {
timestampFormat = inferTimestampType(timestamp);
}

View File

@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@ -105,13 +106,12 @@ public class TimestampConverterTest {
xformValue.configure(config);
}
// Conversions without schemas (most flexible Timestamp -> other types)
@Test
public void testSchemalessIdentity() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@ -120,7 +120,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimestampToDate() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE.getTime(), transformed.value());
@ -129,7 +129,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimestampToTime() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(TIME.getTime(), transformed.value());
@ -138,7 +138,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimestampToUnix() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
@ -150,7 +150,7 @@ public class TimestampConverterTest {
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
@ -162,7 +162,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessDateToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE.getTime()));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE.getTime()));
assertNull(transformed.valueSchema());
// No change expected since the source type is coarser-grained
@ -172,7 +172,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessTimeToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(TIME.getTime()));
assertNull(transformed.valueSchema());
// No change expected since the source type is coarser-grained
@ -182,7 +182,7 @@ public class TimestampConverterTest {
@Test
public void testSchemalessUnixToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_UNIX));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_UNIX));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@ -194,7 +194,7 @@ public class TimestampConverterTest {
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME_STRING));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_STRING));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@ -206,7 +206,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaIdentity() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@ -215,7 +215,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimestampToDate() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Date.SCHEMA, transformed.valueSchema());
assertEquals(DATE.getTime(), transformed.value());
@ -224,7 +224,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimestampToTime() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Time.SCHEMA, transformed.valueSchema());
assertEquals(TIME.getTime(), transformed.value());
@ -233,7 +233,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimestampToUnix() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
@ -245,19 +245,70 @@ public class TimestampConverterTest {
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
}
// Null-value conversions schemaless
@Test
public void testSchemalessNullValueToString() {
testSchemalessNullValueConversion("string");
testSchemalessNullFieldConversion("string");
}
@Test
public void testSchemalessNullValueToDate() {
testSchemalessNullValueConversion("Date");
testSchemalessNullFieldConversion("Date");
}
@Test
public void testSchemalessNullValueToTimestamp() {
testSchemalessNullValueConversion("Timestamp");
testSchemalessNullFieldConversion("Timestamp");
}
@Test
public void testSchemalessNullValueToUnix() {
testSchemalessNullValueConversion("unix");
testSchemalessNullFieldConversion("unix");
}
@Test
public void testSchemalessNullValueToTime() {
testSchemalessNullValueConversion("Time");
testSchemalessNullFieldConversion("Time");
}
private void testSchemalessNullValueConversion(String targetType) {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordSchemaless(null));
assertNull(transformed.valueSchema());
assertNull(transformed.value());
}
private void testSchemalessNullFieldConversion(String targetType) {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
config.put(TimestampConverter.FIELD_CONFIG, "ts");
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordSchemaless(null));
assertNull(transformed.valueSchema());
assertNull(transformed.value());
}
// Conversions with schemas (core types -> most flexible Timestamp format)
@Test
public void testWithSchemaDateToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Date.SCHEMA, DATE.getTime()));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Date.SCHEMA, DATE.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
// No change expected since the source type is coarser-grained
@ -267,7 +318,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaTimeToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Time.SCHEMA, TIME.getTime()));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Time.SCHEMA, TIME.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
// No change expected since the source type is coarser-grained
@ -277,7 +328,7 @@ public class TimestampConverterTest {
@Test
public void testWithSchemaUnixToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
@ -289,12 +340,145 @@ public class TimestampConverterTest {
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
}
// Null-value conversions with schema
@Test
public void testWithSchemaNullValueToTimestamp() {
testWithSchemaNullValueConversion("Timestamp", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullValueConversion("Timestamp", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullValueConversion("Timestamp", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullValueConversion("Timestamp", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullValueConversion("Timestamp", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
}
@Test
public void testWithSchemaNullFieldToTimestamp() {
testWithSchemaNullFieldConversion("Timestamp", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullFieldConversion("Timestamp", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullFieldConversion("Timestamp", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullFieldConversion("Timestamp", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullFieldConversion("Timestamp", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
}
@Test
public void testWithSchemaNullValueToUnix() {
testWithSchemaNullValueConversion("unix", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullValueConversion("unix", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullValueConversion("unix", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullValueConversion("unix", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullValueConversion("unix", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
}
@Test
public void testWithSchemaNullFieldToUnix() {
testWithSchemaNullFieldConversion("unix", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullFieldConversion("unix", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullFieldConversion("unix", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullFieldConversion("unix", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullFieldConversion("unix", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
}
@Test
public void testWithSchemaNullValueToTime() {
testWithSchemaNullValueConversion("Time", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullValueConversion("Time", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullValueConversion("Time", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullValueConversion("Time", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullValueConversion("Time", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
}
@Test
public void testWithSchemaNullFieldToTime() {
testWithSchemaNullFieldConversion("Time", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullFieldConversion("Time", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullFieldConversion("Time", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullFieldConversion("Time", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullFieldConversion("Time", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
}
@Test
public void testWithSchemaNullValueToDate() {
testWithSchemaNullValueConversion("Date", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullValueConversion("Date", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullValueConversion("Date", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullValueConversion("Date", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullValueConversion("Date", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
}
@Test
public void testWithSchemaNullFieldToDate() {
testWithSchemaNullFieldConversion("Date", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullFieldConversion("Date", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullFieldConversion("Date", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullFieldConversion("Date", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullFieldConversion("Date", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
}
@Test
public void testWithSchemaNullValueToString() {
testWithSchemaNullValueConversion("string", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullValueConversion("string", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullValueConversion("string", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullValueConversion("string", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullValueConversion("string", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
}
@Test
public void testWithSchemaNullFieldToString() {
testWithSchemaNullFieldConversion("string", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullFieldConversion("string", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullFieldConversion("string", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullFieldConversion("string", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullFieldConversion("string", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
}
private void testWithSchemaNullValueConversion(String targetType, Schema originalSchema, Schema expectedSchema) {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordWithSchema(originalSchema, null));
assertEquals(expectedSchema, transformed.valueSchema());
assertNull(transformed.value());
}
private void testWithSchemaNullFieldConversion(String targetType, Schema originalSchema, Schema expectedSchema) {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
config.put(TimestampConverter.FIELD_CONFIG, "ts");
xformValue.configure(config);
SchemaBuilder structSchema = SchemaBuilder.struct()
.field("ts", originalSchema)
.field("other", Schema.STRING_SCHEMA);
SchemaBuilder expectedStructSchema = SchemaBuilder.struct()
.field("ts", expectedSchema)
.field("other", Schema.STRING_SCHEMA);
Struct original = new Struct(structSchema);
original.put("ts", null);
original.put("other", "test");
// Struct field is null
SourceRecord transformed = xformValue.apply(createRecordWithSchema(structSchema.build(), original));
assertEquals(expectedStructSchema.build(), transformed.valueSchema());
assertNull(requireStruct(transformed.value(), "").get("ts"));
// entire Struct is null
transformed = xformValue.apply(createRecordWithSchema(structSchema.optional().build(), null));
assertEquals(expectedStructSchema.optional().build(), transformed.valueSchema());
assertNull(transformed.value());
}
// Convert field instead of entire key/value
@ -306,7 +490,7 @@ public class TimestampConverterTest {
xformValue.configure(config);
Object value = Collections.singletonMap("ts", DATE_PLUS_TIME.getTime());
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, null, value));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(value));
assertNull(transformed.valueSchema());
assertEquals(Collections.singletonMap("ts", DATE.getTime()), transformed.value());
@ -328,7 +512,7 @@ public class TimestampConverterTest {
original.put("ts", DATE_PLUS_TIME_UNIX);
original.put("other", "test");
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0, structWithTimestampFieldSchema, original));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(structWithTimestampFieldSchema, original));
Schema expectedSchema = SchemaBuilder.struct()
.field("ts", Timestamp.SCHEMA)
@ -351,4 +535,11 @@ public class TimestampConverterTest {
assertEquals(DATE_PLUS_TIME.getTime(), transformed.key());
}
private SourceRecord createRecordWithSchema(Schema schema, Object value) {
return new SourceRecord(null, null, "topic", 0, schema, value);
}
private SourceRecord createRecordSchemaless(Object value) {
return createRecordWithSchema(null, value);
}
}