KAFKA-12305: Fix Flatten SMT for array types (#10074)

Reviewers: Nigel Liang <nigel@nigelliang.com>, Tom Bentley <tbentley@redhat.com>
This commit is contained in:
Chris Egerton 2021-08-03 08:53:54 -04:00 committed by GitHub
parent b980ca8709
commit 79788ca042
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 7 deletions

View File

@ -42,7 +42,7 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
public static final String OVERVIEW_DOC =
"Flatten a nested data structure, generating names for each field by concatenating the field names at each "
+ "level with a configurable delimiter character. Applies to Struct when schema present, or a Map "
+ "in the case of schemaless data. The default delimiter is '.'."
+ "in the case of schemaless data. Array fields and their contents are not modified. The default delimiter is '.'."
+ "<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) "
+ "or value (<code>" + Value.class.getName() + "</code>).";
@ -124,6 +124,7 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
case BOOLEAN:
case STRING:
case BYTES:
case ARRAY:
newRecord.put(fieldName(fieldNamePrefix, entry.getKey()), entry.getValue());
break;
case MAP:
@ -189,6 +190,7 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
case BOOLEAN:
case STRING:
case BYTES:
case ARRAY:
newSchema.field(fieldName, convertFieldSchema(field.schema(), fieldIsOptional, fieldDefaultValue));
break;
case STRUCT:
@ -237,6 +239,7 @@ public abstract class Flatten<R extends ConnectRecord<R>> implements Transformat
case BOOLEAN:
case STRING:
case BYTES:
case ARRAY:
newRecord.put(fieldName, record.get(field));
break;
case STRUCT:

View File

@ -24,7 +24,13 @@ import java.util.Map;
public class SchemaUtil {
public static SchemaBuilder copySchemaBasics(Schema source) {
return copySchemaBasics(source, new SchemaBuilder(source.type()));
SchemaBuilder builder;
if (source.type() == Schema.Type.ARRAY) {
builder = SchemaBuilder.array(source.valueSchema());
} else {
builder = new SchemaBuilder(source.type());
}
return copySchemaBasics(source, builder);
}
public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) {

View File

@ -263,11 +263,29 @@ public class FlattenTest {
}
@Test
public void testUnsupportedTypeInMap() {
xformValue.configure(Collections.<String, String>emptyMap());
Object value = Collections.singletonMap("foo", Arrays.asList("bar", "baz"));
assertThrows(DataException.class, () -> xformValue.apply(new SourceRecord(null, null,
"topic", 0, null, value)));
public void testSchemalessArray() {
xformValue.configure(Collections.emptyMap());
Object value = Collections.singletonMap("foo", Arrays.asList("bar", Collections.singletonMap("baz", Collections.singletonMap("lfg", "lfg"))));
assertEquals(value, xformValue.apply(new SourceRecord(null, null, "topic", null, null, null, value)).value());
}
@Test
public void testArrayWithSchema() {
xformValue.configure(Collections.emptyMap());
Schema nestedStructSchema = SchemaBuilder.struct().field("lfg", Schema.STRING_SCHEMA).build();
Schema innerStructSchema = SchemaBuilder.struct().field("baz", nestedStructSchema).build();
Schema structSchema = SchemaBuilder.struct()
.field("foo", SchemaBuilder.array(innerStructSchema).doc("durk").build())
.build();
Struct nestedValue = new Struct(nestedStructSchema);
nestedValue.put("lfg", "lfg");
Struct innerValue = new Struct(innerStructSchema);
innerValue.put("baz", nestedValue);
Struct value = new Struct(structSchema);
value.put("foo", Collections.singletonList(innerValue));
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", null, null, structSchema, value));
assertEquals(value, transformed.value());
assertEquals(structSchema, transformed.valueSchema());
}
@Test