KAFKA-6605: Fix NPE in Flatten when optional Struct is null (#5705)

Correct the Flatten SMT to properly handle null key or value `Struct` instances.

Author: Michal Borowiecki <michal.borowiecki@openbet.com>
Reviewers: Arjun Satish <arjun@confluent.io>, Robert Yokota <rayokota@gmail.com>, Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Michał Borowiecki 2019-07-12 16:27:33 +01:00 committed by Randall Hauch
parent cf3f9ac4ea
commit d78fe37ae0
2 changed files with 58 additions and 11 deletions

View File

@ -35,7 +35,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
public abstract class Flatten<R extends ConnectRecord<R>> implements Transformation<R> {
@ -136,20 +136,24 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
}
private R applyWithSchema(R record) {
final Struct value = requireStruct(operatingValue(record), PURPOSE);
final Struct value = requireStructOrNull(operatingValue(record), PURPOSE);
Schema updatedSchema = schemaUpdateCache.get(value.schema());
Schema schema = operatingSchema(record);
Schema updatedSchema = schemaUpdateCache.get(schema);
if (updatedSchema == null) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(value.schema(), SchemaBuilder.struct());
Struct defaultValue = (Struct) value.schema().defaultValue();
buildUpdatedSchema(value.schema(), "", builder, value.schema().isOptional(), defaultValue);
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
Struct defaultValue = (Struct) schema.defaultValue();
buildUpdatedSchema(schema, "", builder, schema.isOptional(), defaultValue);
updatedSchema = builder.build();
schemaUpdateCache.put(value.schema(), updatedSchema);
schemaUpdateCache.put(schema, updatedSchema);
}
if (value == null) {
return newRecord(record, updatedSchema, null);
} else {
final Struct updatedValue = new Struct(updatedSchema);
buildWithSchema(value, "", updatedValue);
return newRecord(record, updatedSchema, updatedValue);
}
final Struct updatedValue = new Struct(updatedSchema);
buildWithSchema(value, "", updatedValue);
return newRecord(record, updatedSchema, updatedValue);
}
/**
@ -216,6 +220,9 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
}
private void buildWithSchema(Struct record, String fieldNamePrefix, Struct newRecord) {
if (record == null) {
return;
}
for (Field field : record.schema().fields()) {
final String fieldName = fieldName(fieldNamePrefix, field.name());
switch (field.schema().type()) {

View File

@ -181,6 +181,46 @@ public class FlattenTest {
assertNull(transformedStruct.get("B.opt_int32"));
}
@Test
public void testOptionalStruct() {
xformValue.configure(Collections.<String, String>emptyMap());
SchemaBuilder builder = SchemaBuilder.struct().optional();
builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
Schema schema = builder.build();
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
"topic", 0,
schema, null));
assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
assertNull(transformed.value());
}
@Test
public void testOptionalNestedStruct() {
xformValue.configure(Collections.<String, String>emptyMap());
SchemaBuilder builder = SchemaBuilder.struct().optional();
builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
Schema supportedTypesSchema = builder.build();
builder = SchemaBuilder.struct();
builder.field("B", supportedTypesSchema);
Schema oneLevelNestedSchema = builder.build();
Struct oneLevelNestedStruct = new Struct(oneLevelNestedSchema);
oneLevelNestedStruct.put("B", null);
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
"topic", 0,
oneLevelNestedSchema, oneLevelNestedStruct));
assertEquals(Schema.Type.STRUCT, transformed.valueSchema().type());
Struct transformedStruct = (Struct) transformed.value();
assertNull(transformedStruct.get("B.opt_int32"));
}
@Test
public void testOptionalFieldMap() {
xformValue.configure(Collections.<String, String>emptyMap());