From 1957be19d963d3416e3d75a763f0248a51604efc Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Wed, 31 May 2023 06:48:03 -0700 Subject: [PATCH] KAFKA-8713: JsonConverter replace.null.with.default should prevent emitting default for Struct fields (#13781) Co-authored-by: GeunJae Jeon Reviewers: Mickael Maison --- .../kafka/connect/json/JsonConverter.java | 2 +- .../kafka/connect/json/JsonConverterTest.java | 39 +++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index 99a1a846030..ae5a6989b2a 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -661,7 +661,7 @@ public class JsonConverter implements Converter, HeaderConverter { throw new DataException("Mismatching schema."); ObjectNode obj = JSON_NODE_FACTORY.objectNode(); for (Field field : schema.fields()) { - obj.set(field.name(), convertToJson(field.schema(), struct.get(field))); + obj.set(field.name(), convertToJson(field.schema(), struct.getWithoutDefault(field.name()))); } return obj; } diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index 4af1c400867..d3d8999cc53 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -921,6 +921,45 @@ public class JsonConverterTest { assertNull(sav.value()); } + @Test + public void serializeFieldNullToDefault() { + converter.configure(Collections.singletonMap(JsonConverterConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, true), false); + Schema schema = SchemaBuilder.string().optional().defaultValue("default").build(); + Schema structSchema = SchemaBuilder.struct().field("field1", schema).build(); + JsonNode converted = parse(converter.fromConnectData(TOPIC, structSchema, new Struct(structSchema))); + JsonNode expected = parse("{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"field1\",\"type\":\"string\",\"optional\":true,\"default\":\"default\"}],\"optional\":false},\"payload\":{\"field1\":\"default\"}}"); + assertEquals(expected, converted); + } + + @Test + public void serializeFieldNullToNull() { + converter.configure(Collections.singletonMap(JsonConverterConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, false), false); + Schema schema = SchemaBuilder.string().optional().defaultValue("default").build(); + Schema structSchema = SchemaBuilder.struct().field("field1", schema).build(); + JsonNode converted = parse(converter.fromConnectData(TOPIC, structSchema, new Struct(structSchema))); + JsonNode expected = parse("{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"field1\",\"type\":\"string\",\"optional\":true,\"default\":\"default\"}],\"optional\":false},\"payload\":{\"field1\":null}}"); + assertEquals(expected, converted); + } + + @Test + public void deserializeFieldNullToDefault() { + converter.configure(Collections.singletonMap(JsonConverterConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, true), false); + String value = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"field1\",\"type\":\"string\",\"optional\":true,\"default\":\"default\"}],\"optional\":false},\"payload\":{\"field1\":null}}"; + SchemaAndValue sav = converter.toConnectData(TOPIC, null, value.getBytes()); + Schema schema = SchemaBuilder.string().optional().defaultValue("default").build(); + Schema structSchema = SchemaBuilder.struct().field("field1", schema).build(); + assertEquals(new Struct(structSchema).put("field1", "default"), sav.value()); + } + + @Test + public void deserializeFieldNullToNull() { + converter.configure(Collections.singletonMap(JsonConverterConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, false), false); + String value = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"field\":\"field1\",\"type\":\"string\",\"optional\":true,\"default\":\"default\"}],\"optional\":false},\"payload\":{\"field1\":null}}"; + SchemaAndValue sav = converter.toConnectData(TOPIC, null, value.getBytes()); + Schema schema = SchemaBuilder.string().optional().defaultValue("default").build(); + Schema structSchema = SchemaBuilder.struct().field("field1", schema).build(); + assertEquals(new Struct(structSchema), sav.value()); + } private JsonNode parse(byte[] json) { try {