diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java index 92d69f8adc1..12ddfed0b92 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java @@ -54,6 +54,7 @@ public abstract class ReplaceField> implements Transf String INCLUDE = "include"; String RENAME = "renames"; + String REPLACE_NULL_WITH_DEFAULT_CONFIG = "replace.null.with.default"; } public static final ConfigDef CONFIG_DEF = new ConfigDef() @@ -76,7 +77,9 @@ public abstract class ReplaceField> implements Transf public String toString() { return "list of colon-delimited pairs, e.g. foo:bar,abc:xyz"; } - }, ConfigDef.Importance.MEDIUM, "Field rename mappings."); + }, ConfigDef.Importance.MEDIUM, "Field rename mappings.") + .define(ConfigName.REPLACE_NULL_WITH_DEFAULT_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, + "Whether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used."); private static final String PURPOSE = "field replacement"; @@ -84,6 +87,7 @@ public abstract class ReplaceField> implements Transf private Set include; private Map renames; private Map reverseRenames; + private boolean replaceNullWithDefault; private Cache schemaUpdateCache; @@ -103,6 +107,7 @@ public abstract class ReplaceField> implements Transf include = new HashSet<>(config.getList(ConfigName.INCLUDE)); renames = parseRenameMappings(config.getList(ConfigName.RENAME)); reverseRenames = invert(renames); + replaceNullWithDefault = config.getBoolean(ConfigName.REPLACE_NULL_WITH_DEFAULT_CONFIG); schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16)); } @@ -180,7 +185,7 @@ public abstract class ReplaceField> implements Transf final Struct updatedValue = new Struct(updatedSchema); for (Field field : updatedSchema.fields()) { - final Object fieldValue = value.get(reverseRenamed(field.name())); + final Object fieldValue = replaceNullWithDefault ? value.get(reverseRenamed(field.name())) : value.getWithoutDefault(reverseRenamed(field.name())); updatedValue.put(field.name(), fieldValue); } diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java index 454ea38f572..24cdec2249a 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java @@ -43,14 +43,18 @@ public class ValueToKey> implements Transformation public static final String OVERVIEW_DOC = "Replace the record key with a new key formed from a subset of fields in the record value."; public static final String FIELDS_CONFIG = "fields"; + public static final String REPLACE_NULL_WITH_DEFAULT_CONFIG = "replace.null.with.default"; public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, - "Field names on the record value to extract as the record key."); + "Field names on the record value to extract as the record key.") + .define(REPLACE_NULL_WITH_DEFAULT_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, + "Whether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used."); private static final String PURPOSE = "copying fields from value to key"; private List fields; + private boolean replaceNullWithDefault; private Cache valueToKeySchemaCache; @@ -63,6 +67,7 @@ public class ValueToKey> implements Transformation public void configure(Map configs) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); fields = config.getList(FIELDS_CONFIG); + replaceNullWithDefault = config.getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG); valueToKeySchemaCache = new SynchronizedCache<>(new LRUCache<>(16)); } @@ -103,7 +108,7 @@ public class ValueToKey> implements Transformation final Struct key = new Struct(keySchema); for (String field : fields) { - key.put(field, value.get(field)); + key.put(field, replaceNullWithDefault ? value.get(field) : value.getWithoutDefault(field)); } return record.newRecord(record.topic(), record.kafkaPartition(), keySchema, key, value.schema(), value, record.timestamp()); diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java index 53566cada7c..31c24ca5b8a 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java @@ -24,16 +24,28 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.HashMap; import java.util.Map; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; public class ReplaceFieldTest { + private final ReplaceField xformKey = new ReplaceField.Key<>(); private final ReplaceField xform = new ReplaceField.Value<>(); + public static Stream data() { + return Stream.of( + Arguments.of(false, null), + Arguments.of(true, 42) + ); + } + @AfterEach public void teardown() { xform.close(); @@ -176,4 +188,54 @@ public class ReplaceFieldTest { public void testReplaceFieldVersionRetrievedFromAppInfoParser() { assertEquals(AppInfoParser.getVersion(), xform.version()); } + + @ParameterizedTest + @MethodSource("data") + public void testReplaceNullWithDefaultConfigOnValue(boolean replaceNullWithDefault, Object expectedValue) { + final Map props = new HashMap<>(); + props.put("include", "abc"); + props.put("renames", "abc:optional_with_default"); + props.put("replace.null.with.default", String.valueOf(replaceNullWithDefault)); + + xform.configure(props); + + final Schema valueSchema = SchemaBuilder.struct() + .field("abc", SchemaBuilder.int32().optional().defaultValue(42).build()) + .build(); + + final Struct value = new Struct(valueSchema).put("abc", null); + + final SinkRecord record = new SinkRecord("test", 0, null, null, valueSchema, value, 0); + final SinkRecord transformedRecord = xform.apply(record); + + final Struct updatedValue = (Struct) transformedRecord.value(); + + assertEquals(1, updatedValue.schema().fields().size()); + assertEquals(expectedValue, updatedValue.getWithoutDefault("optional_with_default")); + } + + @ParameterizedTest + @MethodSource("data") + public void testReplaceNullWithDefaultConfigOnKey(boolean replaceNullWithDefault, Object expectedValue) { + final Map props = new HashMap<>(); + props.put("include", "abc"); + props.put("renames", "abc:optional_with_default"); + props.put("replace.null.with.default", String.valueOf(replaceNullWithDefault)); + + xformKey.configure(props); + + final Schema keySchema = SchemaBuilder.struct() + .field("abc", SchemaBuilder.int32().optional().defaultValue(42).build()) + .build(); + + final Struct key = new Struct(keySchema).put("abc", null); + + final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0); + final SinkRecord transformedRecord = xformKey.apply(record); + + final Struct updatedKey = (Struct) transformedRecord.key(); + + assertEquals(1, updatedKey.schema().fields().size()); + assertEquals(expectedValue, updatedKey.getWithoutDefault("optional_with_default")); + } } diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java index a7d032009e5..df528cf518a 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java @@ -25,9 +25,14 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.util.Collections; import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -36,6 +41,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows; public class ValueToKeyTest { private final ValueToKey xform = new ValueToKey<>(); + public static Stream data() { + return Stream.of( + Arguments.of(false, null), + Arguments.of(true, 42) + ); + } + @AfterEach public void teardown() { xform.close(); @@ -113,4 +125,23 @@ public class ValueToKeyTest { public void testValueToKeyVersionRetrievedFromAppInfoParser() { assertEquals(AppInfoParser.getVersion(), xform.version()); } + + @ParameterizedTest + @MethodSource("data") + public void testReplaceNullWithDefaultConfig(boolean replaceNullWithDefault, Object expectedValue) { + Map config = new HashMap<>(); + config.put("fields", "optional_with_default"); + config.put("replace.null.with.default", replaceNullWithDefault); + xform.configure(config); + + final Schema valueSchema = SchemaBuilder.struct() + .field("optional_with_default", SchemaBuilder.int32().optional().defaultValue(42).build()) + .build(); + final Struct value = new Struct(valueSchema).put("optional_with_default", null); + + final SinkRecord record = new SinkRecord("", 0, null, null, valueSchema, value, 0); + final SinkRecord transformedRecord = xform.apply(record); + + assertEquals(expectedValue, ((Struct) transformedRecord.key()).getWithoutDefault("optional_with_default")); + } }