KAFKA-9706: Handle null in keys or values when Flatten transformation is used (#8279)

* Fixed DataException thrown when handling tombstone events with null value
* Passes through original record when finding a null key when it's configured for keys or a null value when it's configured for values. 
* Added unit tests for schema and schemaless data
This commit is contained in:
Greg Harris 2020-03-30 15:09:27 -07:00 committed by Konstantine Karantasis
parent fd4512fa71
commit ee6fbc640e
2 changed files with 28 additions and 1 deletions

View File

@ -69,7 +69,9 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
@Override
public R apply(R record) {
if (operatingSchema(record) == null) {
if (operatingValue(record) == null) {
return record;
} else if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);

View File

@ -297,4 +297,29 @@ public class FlattenTest {
Schema transformedOptFieldSchema = SchemaBuilder.string().optional().defaultValue("child_default").build();
assertEquals(transformedOptFieldSchema, transformedSchema.field("opt_field").schema());
}
@Test
public void tombstoneEventWithoutSchemaShouldPassThrough() {
xformValue.configure(Collections.<String, String>emptyMap());
final SourceRecord record = new SourceRecord(null, null, "test", 0,
null, null);
final SourceRecord transformedRecord = xformValue.apply(record);
assertEquals(null, transformedRecord.value());
assertEquals(null, transformedRecord.valueSchema());
}
@Test
public void tombstoneEventWithSchemaShouldPassThrough() {
xformValue.configure(Collections.<String, String>emptyMap());
final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
final SourceRecord record = new SourceRecord(null, null, "test", 0,
simpleStructSchema, null);
final SourceRecord transformedRecord = xformValue.apply(record);
assertEquals(null, transformedRecord.value());
assertEquals(simpleStructSchema, transformedRecord.valueSchema());
}
}