KAFKA-8713: JsonConverter replace.null.with.default should prevent emitting default for Struct fields (#13781)

Co-authored-by: GeunJae Jeon <krespo>
Reviewers: Mickael Maison <mickael.maison@gmail.com>
This commit is contained in:
Greg Harris 2023-05-31 06:48:03 -07:00 committed by GitHub
parent 08d47cb41d
commit 1957be19d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 1 deletions

View File

@ -661,7 +661,7 @@ public class JsonConverter implements Converter, HeaderConverter {
throw new DataException("Mismatching schema.");
ObjectNode obj = JSON_NODE_FACTORY.objectNode();
for (Field field : schema.fields()) {
obj.set(field.name(), convertToJson(field.schema(), struct.get(field)));
obj.set(field.name(), convertToJson(field.schema(), struct.getWithoutDefault(field.name())));
}
return obj;
}

View File

@ -921,6 +921,45 @@ public class JsonConverterTest {
assertNull(sav.value());
}
@Test
public void serializeFieldNullToDefault() {
converter.configure(Collections.singletonMap(JsonConverterConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, true), false);
Schema schema = SchemaBuilder.string().optional().defaultValue("default").build();
Schema structSchema = SchemaBuilder.struct().field("field1", schema).build();
JsonNode converted = parse(converter.fromConnectData(TOPIC, structSchema, new Struct(structSchema)));
JsonNode expected = parse("{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"field1\",\"type\":\"string\",\"optional\":true,\"default\":\"default\"}],\"optional\":false},\"payload\":{\"field1\":\"default\"}}");
assertEquals(expected, converted);
}
@Test
public void serializeFieldNullToNull() {
converter.configure(Collections.singletonMap(JsonConverterConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, false), false);
Schema schema = SchemaBuilder.string().optional().defaultValue("default").build();
Schema structSchema = SchemaBuilder.struct().field("field1", schema).build();
JsonNode converted = parse(converter.fromConnectData(TOPIC, structSchema, new Struct(structSchema)));
JsonNode expected = parse("{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"field1\",\"type\":\"string\",\"optional\":true,\"default\":\"default\"}],\"optional\":false},\"payload\":{\"field1\":null}}");
assertEquals(expected, converted);
}
@Test
public void deserializeFieldNullToDefault() {
converter.configure(Collections.singletonMap(JsonConverterConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, true), false);
String value = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"field1\",\"type\":\"string\",\"optional\":true,\"default\":\"default\"}],\"optional\":false},\"payload\":{\"field1\":null}}";
SchemaAndValue sav = converter.toConnectData(TOPIC, null, value.getBytes());
Schema schema = SchemaBuilder.string().optional().defaultValue("default").build();
Schema structSchema = SchemaBuilder.struct().field("field1", schema).build();
assertEquals(new Struct(structSchema).put("field1", "default"), sav.value());
}
@Test
public void deserializeFieldNullToNull() {
converter.configure(Collections.singletonMap(JsonConverterConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, false), false);
String value = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"field1\",\"type\":\"string\",\"optional\":true,\"default\":\"default\"}],\"optional\":false},\"payload\":{\"field1\":null}}";
SchemaAndValue sav = converter.toConnectData(TOPIC, null, value.getBytes());
Schema schema = SchemaBuilder.string().optional().defaultValue("default").build();
Schema structSchema = SchemaBuilder.struct().field("field1", schema).build();
assertEquals(new Struct(structSchema), sav.value());
}
private JsonNode parse(byte[] json) {
try {