KAFKA-17097 Add replace.null.with.default configuration to ValueToKey and ReplaceField (#16571)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2024-07-15 18:12:59 +08:00 committed by GitHub
parent 4cec840baf
commit 6e9c039eea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 107 additions and 4 deletions

View File

@ -54,6 +54,7 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
String INCLUDE = "include";
String RENAME = "renames";
String REPLACE_NULL_WITH_DEFAULT_CONFIG = "replace.null.with.default";
}
public static final ConfigDef CONFIG_DEF = new ConfigDef()
@ -76,7 +77,9 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
public String toString() {
return "list of colon-delimited pairs, e.g. <code>foo:bar,abc:xyz</code>";
}
}, ConfigDef.Importance.MEDIUM, "Field rename mappings.");
}, ConfigDef.Importance.MEDIUM, "Field rename mappings.")
.define(ConfigName.REPLACE_NULL_WITH_DEFAULT_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM,
"Whether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used.");
private static final String PURPOSE = "field replacement";
@ -84,6 +87,7 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
private Set<String> include;
private Map<String, String> renames;
private Map<String, String> reverseRenames;
private boolean replaceNullWithDefault;
private Cache<Schema, Schema> schemaUpdateCache;
@ -103,6 +107,7 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
include = new HashSet<>(config.getList(ConfigName.INCLUDE));
renames = parseRenameMappings(config.getList(ConfigName.RENAME));
reverseRenames = invert(renames);
replaceNullWithDefault = config.getBoolean(ConfigName.REPLACE_NULL_WITH_DEFAULT_CONFIG);
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));
}
@ -180,7 +185,7 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
final Struct updatedValue = new Struct(updatedSchema);
for (Field field : updatedSchema.fields()) {
final Object fieldValue = value.get(reverseRenamed(field.name()));
final Object fieldValue = replaceNullWithDefault ? value.get(reverseRenamed(field.name())) : value.getWithoutDefault(reverseRenamed(field.name()));
updatedValue.put(field.name(), fieldValue);
}

View File

@ -43,14 +43,18 @@ public class ValueToKey<R extends ConnectRecord<R>> implements Transformation<R>
public static final String OVERVIEW_DOC = "Replace the record key with a new key formed from a subset of fields in the record value.";
public static final String FIELDS_CONFIG = "fields";
public static final String REPLACE_NULL_WITH_DEFAULT_CONFIG = "replace.null.with.default";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH,
"Field names on the record value to extract as the record key.");
"Field names on the record value to extract as the record key.")
.define(REPLACE_NULL_WITH_DEFAULT_CONFIG, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM,
"Whether to replace fields that have a default value and that are null to the default value. When set to true, the default value is used, otherwise null is used.");
private static final String PURPOSE = "copying fields from value to key";
private List<String> fields;
private boolean replaceNullWithDefault;
private Cache<Schema, Schema> valueToKeySchemaCache;
@ -63,6 +67,7 @@ public class ValueToKey<R extends ConnectRecord<R>> implements Transformation<R>
public void configure(Map<String, ?> configs) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
fields = config.getList(FIELDS_CONFIG);
replaceNullWithDefault = config.getBoolean(REPLACE_NULL_WITH_DEFAULT_CONFIG);
valueToKeySchemaCache = new SynchronizedCache<>(new LRUCache<>(16));
}
@ -103,7 +108,7 @@ public class ValueToKey<R extends ConnectRecord<R>> implements Transformation<R>
final Struct key = new Struct(keySchema);
for (String field : fields) {
key.put(field, value.get(field));
key.put(field, replaceNullWithDefault ? value.get(field) : value.getWithoutDefault(field));
}
return record.newRecord(record.topic(), record.kafkaPartition(), keySchema, key, value.schema(), value, record.timestamp());

View File

@ -24,16 +24,28 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
public class ReplaceFieldTest {
private final ReplaceField<SinkRecord> xformKey = new ReplaceField.Key<>();
private final ReplaceField<SinkRecord> xform = new ReplaceField.Value<>();
public static Stream<Arguments> data() {
return Stream.of(
Arguments.of(false, null),
Arguments.of(true, 42)
);
}
@AfterEach
public void teardown() {
xform.close();
@ -176,4 +188,54 @@ public class ReplaceFieldTest {
public void testReplaceFieldVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
@ParameterizedTest
@MethodSource("data")
public void testReplaceNullWithDefaultConfigOnValue(boolean replaceNullWithDefault, Object expectedValue) {
final Map<String, String> props = new HashMap<>();
props.put("include", "abc");
props.put("renames", "abc:optional_with_default");
props.put("replace.null.with.default", String.valueOf(replaceNullWithDefault));
xform.configure(props);
final Schema valueSchema = SchemaBuilder.struct()
.field("abc", SchemaBuilder.int32().optional().defaultValue(42).build())
.build();
final Struct value = new Struct(valueSchema).put("abc", null);
final SinkRecord record = new SinkRecord("test", 0, null, null, valueSchema, value, 0);
final SinkRecord transformedRecord = xform.apply(record);
final Struct updatedValue = (Struct) transformedRecord.value();
assertEquals(1, updatedValue.schema().fields().size());
assertEquals(expectedValue, updatedValue.getWithoutDefault("optional_with_default"));
}
@ParameterizedTest
@MethodSource("data")
public void testReplaceNullWithDefaultConfigOnKey(boolean replaceNullWithDefault, Object expectedValue) {
final Map<String, String> props = new HashMap<>();
props.put("include", "abc");
props.put("renames", "abc:optional_with_default");
props.put("replace.null.with.default", String.valueOf(replaceNullWithDefault));
xformKey.configure(props);
final Schema keySchema = SchemaBuilder.struct()
.field("abc", SchemaBuilder.int32().optional().defaultValue(42).build())
.build();
final Struct key = new Struct(keySchema).put("abc", null);
final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0);
final SinkRecord transformedRecord = xformKey.apply(record);
final Struct updatedKey = (Struct) transformedRecord.key();
assertEquals(1, updatedKey.schema().fields().size());
assertEquals(expectedValue, updatedKey.getWithoutDefault("optional_with_default"));
}
}

View File

@ -25,9 +25,14 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@ -36,6 +41,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class ValueToKeyTest {
private final ValueToKey<SinkRecord> xform = new ValueToKey<>();
public static Stream<Arguments> data() {
return Stream.of(
Arguments.of(false, null),
Arguments.of(true, 42)
);
}
@AfterEach
public void teardown() {
xform.close();
@ -113,4 +125,23 @@ public class ValueToKeyTest {
public void testValueToKeyVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version());
}
@ParameterizedTest
@MethodSource("data")
public void testReplaceNullWithDefaultConfig(boolean replaceNullWithDefault, Object expectedValue) {
Map<String, Object> config = new HashMap<>();
config.put("fields", "optional_with_default");
config.put("replace.null.with.default", replaceNullWithDefault);
xform.configure(config);
final Schema valueSchema = SchemaBuilder.struct()
.field("optional_with_default", SchemaBuilder.int32().optional().defaultValue(42).build())
.build();
final Struct value = new Struct(valueSchema).put("optional_with_default", null);
final SinkRecord record = new SinkRecord("", 0, null, null, valueSchema, value, 0);
final SinkRecord transformedRecord = xform.apply(record);
assertEquals(expectedValue, ((Struct) transformedRecord.key()).getWithoutDefault("optional_with_default"));
}
}