mirror of https://github.com/apache/kafka.git
MINOR: Cleanup Connect Module (2/n) (#19871)
Now that Kafka support Java 17, this PR makes some changes in connect module. The changes in this PR are limited to only some files. A future PR(s) shall follow. The changes mostly include: - Collections.emptyList(), Collections.singletonList() and Arrays.asList() are replaced with List.of() - Collections.emptyMap() and Collections.singletonMap() are replaced with Map.of() - Collections.singleton() is replaced with Set.of() Modules target: test-plugins, transforms Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
8a5549ca9b
commit
50b748b024
|
@ -24,7 +24,6 @@ import org.apache.kafka.connect.connector.Task;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
|
@ -91,7 +90,7 @@ public class MockConnector extends Connector {
|
|||
@Override
|
||||
public List<Map<String, String>> taskConfigs(int maxTasks) {
|
||||
log.debug("Creating single task for MockConnector");
|
||||
return Collections.singletonList(config);
|
||||
return List.of(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.kafka.connect.source.SourceTask;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -67,7 +66,7 @@ public class MockSourceTask extends SourceTask {
|
|||
throw new RuntimeException();
|
||||
}
|
||||
}
|
||||
return Collections.emptyList();
|
||||
return List.of();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.kafka.server.util.ThroughputThrottler;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -104,7 +103,7 @@ public class SchemaSourceTask extends SourceTask {
|
|||
}
|
||||
|
||||
throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
|
||||
partition = Collections.singletonMap(ID_FIELD, id);
|
||||
partition = Map.of(ID_FIELD, id);
|
||||
Map<String, Object> previousOffset = this.context.offsetStorageReader().offset(partition);
|
||||
if (previousOffset != null) {
|
||||
seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1;
|
||||
|
@ -124,7 +123,7 @@ public class SchemaSourceTask extends SourceTask {
|
|||
throttler.throttle();
|
||||
}
|
||||
|
||||
Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno);
|
||||
Map<String, Long> ccOffset = Map.of(SEQNO_FIELD, seqno);
|
||||
int partitionVal = (int) (seqno % partitionCount);
|
||||
final Struct data;
|
||||
final SourceRecord srcRecord;
|
||||
|
@ -158,10 +157,10 @@ public class SchemaSourceTask extends SourceTask {
|
|||
System.out.println("{\"task\": " + id + ", \"seqno\": " + seqno + "}");
|
||||
seqno++;
|
||||
count++;
|
||||
return Collections.singletonList(srcRecord);
|
||||
return List.of(srcRecord);
|
||||
} else {
|
||||
throttler.throttle();
|
||||
return Collections.emptyList();
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -94,7 +93,7 @@ public class VerifiableSourceTask extends SourceTask {
|
|||
throw new ConnectException("Invalid VerifiableSourceTask configuration", e);
|
||||
}
|
||||
|
||||
partition = Collections.singletonMap(ID_FIELD, id);
|
||||
partition = Map.of(ID_FIELD, id);
|
||||
Map<String, Object> previousOffset = this.context.offsetStorageReader().offset(partition);
|
||||
if (previousOffset != null)
|
||||
seqno = (Long) previousOffset.get(SEQNO_FIELD) + 1;
|
||||
|
@ -129,11 +128,11 @@ public class VerifiableSourceTask extends SourceTask {
|
|||
}
|
||||
System.out.println(dataJson);
|
||||
|
||||
Map<String, Long> ccOffset = Collections.singletonMap(SEQNO_FIELD, seqno);
|
||||
Map<String, Long> ccOffset = Map.of(SEQNO_FIELD, seqno);
|
||||
Schema valueSchema = completeRecordData ? COMPLETE_VALUE_SCHEMA : Schema.INT64_SCHEMA;
|
||||
Object value = completeRecordData ? completeValue(data) : seqno;
|
||||
SourceRecord srcRecord = new SourceRecord(partition, ccOffset, topic, Schema.INT32_SCHEMA, id, valueSchema, value);
|
||||
List<SourceRecord> result = Collections.singletonList(srcRecord);
|
||||
List<SourceRecord> result = List.of(srcRecord);
|
||||
seqno++;
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -231,38 +231,26 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
|
|||
}
|
||||
|
||||
private SchemaBuilder convertFieldType(Schema.Type type) {
|
||||
switch (type) {
|
||||
case INT8:
|
||||
return SchemaBuilder.int8();
|
||||
case INT16:
|
||||
return SchemaBuilder.int16();
|
||||
case INT32:
|
||||
return SchemaBuilder.int32();
|
||||
case INT64:
|
||||
return SchemaBuilder.int64();
|
||||
case FLOAT32:
|
||||
return SchemaBuilder.float32();
|
||||
case FLOAT64:
|
||||
return SchemaBuilder.float64();
|
||||
case BOOLEAN:
|
||||
return SchemaBuilder.bool();
|
||||
case STRING:
|
||||
return SchemaBuilder.string();
|
||||
default:
|
||||
throw new DataException("Unexpected type in Cast transformation: " + type);
|
||||
}
|
||||
return switch (type) {
|
||||
case INT8 -> SchemaBuilder.int8();
|
||||
case INT16 -> SchemaBuilder.int16();
|
||||
case INT32 -> SchemaBuilder.int32();
|
||||
case INT64 -> SchemaBuilder.int64();
|
||||
case FLOAT32 -> SchemaBuilder.float32();
|
||||
case FLOAT64 -> SchemaBuilder.float64();
|
||||
case BOOLEAN -> SchemaBuilder.bool();
|
||||
case STRING -> SchemaBuilder.string();
|
||||
default -> throw new DataException("Unexpected type in Cast transformation: " + type);
|
||||
};
|
||||
}
|
||||
|
||||
private static Object encodeLogicalType(Schema schema, Object value) {
|
||||
switch (schema.name()) {
|
||||
case Date.LOGICAL_NAME:
|
||||
return Date.fromLogical(schema, (java.util.Date) value);
|
||||
case Time.LOGICAL_NAME:
|
||||
return Time.fromLogical(schema, (java.util.Date) value);
|
||||
case Timestamp.LOGICAL_NAME:
|
||||
return Timestamp.fromLogical(schema, (java.util.Date) value);
|
||||
}
|
||||
return value;
|
||||
return switch (schema.name()) {
|
||||
case Date.LOGICAL_NAME -> Date.fromLogical(schema, (java.util.Date) value);
|
||||
case Time.LOGICAL_NAME -> Time.fromLogical(schema, (java.util.Date) value);
|
||||
case Timestamp.LOGICAL_NAME -> Timestamp.fromLogical(schema, (java.util.Date) value);
|
||||
default -> value;
|
||||
};
|
||||
}
|
||||
|
||||
private static Object castValueToType(Schema schema, Object value, Schema.Type targetType) {
|
||||
|
@ -283,26 +271,17 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation
|
|||
value = encodeLogicalType(schema, value);
|
||||
}
|
||||
|
||||
switch (targetType) {
|
||||
case INT8:
|
||||
return castToInt8(value);
|
||||
case INT16:
|
||||
return castToInt16(value);
|
||||
case INT32:
|
||||
return castToInt32(value);
|
||||
case INT64:
|
||||
return castToInt64(value);
|
||||
case FLOAT32:
|
||||
return castToFloat32(value);
|
||||
case FLOAT64:
|
||||
return castToFloat64(value);
|
||||
case BOOLEAN:
|
||||
return castToBoolean(value);
|
||||
case STRING:
|
||||
return castToString(value);
|
||||
default:
|
||||
throw new DataException(targetType + " is not supported in the Cast transformation.");
|
||||
}
|
||||
return switch (targetType) {
|
||||
case INT8 -> castToInt8(value);
|
||||
case INT16 -> castToInt16(value);
|
||||
case INT32 -> castToInt32(value);
|
||||
case INT64 -> castToInt64(value);
|
||||
case FLOAT32 -> castToFloat32(value);
|
||||
case FLOAT64 -> castToFloat64(value);
|
||||
case BOOLEAN -> castToBoolean(value);
|
||||
case STRING -> castToString(value);
|
||||
default -> throw new DataException(targetType + " is not supported in the Cast transformation.");
|
||||
};
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DataException("Value (" + value.toString() + ") was out of range for requested data type", e);
|
||||
}
|
||||
|
|
|
@ -87,14 +87,11 @@ public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transfor
|
|||
}
|
||||
|
||||
static Operation fromName(String name) {
|
||||
switch (name) {
|
||||
case MOVE_OPERATION:
|
||||
return MOVE;
|
||||
case COPY_OPERATION:
|
||||
return COPY;
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
return switch (name) {
|
||||
case MOVE_OPERATION -> MOVE;
|
||||
case COPY_OPERATION -> COPY;
|
||||
default -> throw new IllegalArgumentException();
|
||||
};
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
|
|
|
@ -65,32 +65,30 @@ public abstract class MaskField<R extends ConnectRecord<R>> implements Transform
|
|||
|
||||
private static final String PURPOSE = "mask fields";
|
||||
|
||||
private static final Map<Class<?>, Function<String, ?>> REPLACEMENT_MAPPING_FUNC = new HashMap<>();
|
||||
private static final Map<Class<?>, Object> PRIMITIVE_VALUE_MAPPING = new HashMap<>();
|
||||
|
||||
static {
|
||||
PRIMITIVE_VALUE_MAPPING.put(Boolean.class, Boolean.FALSE);
|
||||
PRIMITIVE_VALUE_MAPPING.put(Byte.class, (byte) 0);
|
||||
PRIMITIVE_VALUE_MAPPING.put(Short.class, (short) 0);
|
||||
PRIMITIVE_VALUE_MAPPING.put(Integer.class, 0);
|
||||
PRIMITIVE_VALUE_MAPPING.put(Long.class, 0L);
|
||||
PRIMITIVE_VALUE_MAPPING.put(Float.class, 0f);
|
||||
PRIMITIVE_VALUE_MAPPING.put(Double.class, 0d);
|
||||
PRIMITIVE_VALUE_MAPPING.put(BigInteger.class, BigInteger.ZERO);
|
||||
PRIMITIVE_VALUE_MAPPING.put(BigDecimal.class, BigDecimal.ZERO);
|
||||
PRIMITIVE_VALUE_MAPPING.put(Date.class, new Date(0));
|
||||
PRIMITIVE_VALUE_MAPPING.put(String.class, "");
|
||||
|
||||
REPLACEMENT_MAPPING_FUNC.put(Byte.class, v -> Values.convertToByte(null, v));
|
||||
REPLACEMENT_MAPPING_FUNC.put(Short.class, v -> Values.convertToShort(null, v));
|
||||
REPLACEMENT_MAPPING_FUNC.put(Integer.class, v -> Values.convertToInteger(null, v));
|
||||
REPLACEMENT_MAPPING_FUNC.put(Long.class, v -> Values.convertToLong(null, v));
|
||||
REPLACEMENT_MAPPING_FUNC.put(Float.class, v -> Values.convertToFloat(null, v));
|
||||
REPLACEMENT_MAPPING_FUNC.put(Double.class, v -> Values.convertToDouble(null, v));
|
||||
REPLACEMENT_MAPPING_FUNC.put(String.class, Function.identity());
|
||||
REPLACEMENT_MAPPING_FUNC.put(BigDecimal.class, BigDecimal::new);
|
||||
REPLACEMENT_MAPPING_FUNC.put(BigInteger.class, BigInteger::new);
|
||||
}
|
||||
private static final Map<Class<?>, Function<String, ?>> REPLACEMENT_MAPPING_FUNC = Map.of(
|
||||
Byte.class, v -> Values.convertToByte(null, v),
|
||||
Short.class, v -> Values.convertToShort(null, v),
|
||||
Integer.class, v -> Values.convertToInteger(null, v),
|
||||
Long.class, v -> Values.convertToLong(null, v),
|
||||
Float.class, v -> Values.convertToFloat(null, v),
|
||||
Double.class, v -> Values.convertToDouble(null, v),
|
||||
String.class, Function.identity(),
|
||||
BigDecimal.class, BigDecimal::new,
|
||||
BigInteger.class, BigInteger::new
|
||||
);
|
||||
private static final Map<Class<?>, Object> PRIMITIVE_VALUE_MAPPING = Map.ofEntries(
|
||||
Map.entry(Boolean.class, Boolean.FALSE),
|
||||
Map.entry(Byte.class, (byte) 0),
|
||||
Map.entry(Short.class, (short) 0),
|
||||
Map.entry(Integer.class, 0),
|
||||
Map.entry(Long.class, 0L),
|
||||
Map.entry(Float.class, 0f),
|
||||
Map.entry(Double.class, 0d),
|
||||
Map.entry(BigInteger.class, BigInteger.ZERO),
|
||||
Map.entry(BigDecimal.class, BigDecimal.ZERO),
|
||||
Map.entry(Date.class, new Date(0)),
|
||||
Map.entry(String.class, "")
|
||||
);
|
||||
|
||||
private Set<String> maskedFields;
|
||||
private String replacement;
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.kafka.connect.data.Struct;
|
|||
import org.apache.kafka.connect.transforms.util.SchemaUtil;
|
||||
import org.apache.kafka.connect.transforms.util.SimpleConfig;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -56,11 +55,11 @@ public abstract class ReplaceField<R extends ConnectRecord<R>> implements Transf
|
|||
}
|
||||
|
||||
public static final ConfigDef CONFIG_DEF = new ConfigDef()
|
||||
.define(ConfigName.EXCLUDE, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
|
||||
.define(ConfigName.EXCLUDE, ConfigDef.Type.LIST, List.of(), ConfigDef.Importance.MEDIUM,
|
||||
"Fields to exclude. This takes precedence over the fields to include.")
|
||||
.define(ConfigName.INCLUDE, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM,
|
||||
.define(ConfigName.INCLUDE, ConfigDef.Type.LIST, List.of(), ConfigDef.Importance.MEDIUM,
|
||||
"Fields to include. If specified, only these fields will be used.")
|
||||
.define(ConfigName.RENAMES, ConfigDef.Type.LIST, Collections.emptyList(),
|
||||
.define(ConfigName.RENAMES, ConfigDef.Type.LIST, List.of(),
|
||||
ConfigDef.LambdaValidator.with(
|
||||
(name, value) -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -166,17 +166,15 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
|
|||
public Date toRaw(Config config, Object orig) {
|
||||
if (!(orig instanceof Long unixTime))
|
||||
throw new DataException("Expected Unix timestamp to be a Long, but found " + orig.getClass());
|
||||
switch (config.unixPrecision) {
|
||||
case UNIX_PRECISION_SECONDS:
|
||||
return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.SECONDS.toMillis(unixTime));
|
||||
case UNIX_PRECISION_MICROS:
|
||||
return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.MICROSECONDS.toMillis(unixTime));
|
||||
case UNIX_PRECISION_NANOS:
|
||||
return Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.NANOSECONDS.toMillis(unixTime));
|
||||
case UNIX_PRECISION_MILLIS:
|
||||
default:
|
||||
return Timestamp.toLogical(Timestamp.SCHEMA, unixTime);
|
||||
}
|
||||
return switch (config.unixPrecision) {
|
||||
case UNIX_PRECISION_SECONDS ->
|
||||
Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.SECONDS.toMillis(unixTime));
|
||||
case UNIX_PRECISION_MICROS ->
|
||||
Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.MICROSECONDS.toMillis(unixTime));
|
||||
case UNIX_PRECISION_NANOS ->
|
||||
Timestamp.toLogical(Timestamp.SCHEMA, TimeUnit.NANOSECONDS.toMillis(unixTime));
|
||||
default -> Timestamp.toLogical(Timestamp.SCHEMA, unixTime);
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -186,18 +184,13 @@ public abstract class TimestampConverter<R extends ConnectRecord<R>> implements
|
|||
|
||||
@Override
|
||||
public Long toType(Config config, Date orig) {
|
||||
Long unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, orig);
|
||||
switch (config.unixPrecision) {
|
||||
case UNIX_PRECISION_SECONDS:
|
||||
return TimeUnit.MILLISECONDS.toSeconds(unixTimeMillis);
|
||||
case UNIX_PRECISION_MICROS:
|
||||
return TimeUnit.MILLISECONDS.toMicros(unixTimeMillis);
|
||||
case UNIX_PRECISION_NANOS:
|
||||
return TimeUnit.MILLISECONDS.toNanos(unixTimeMillis);
|
||||
case UNIX_PRECISION_MILLIS:
|
||||
default:
|
||||
return unixTimeMillis;
|
||||
}
|
||||
long unixTimeMillis = Timestamp.fromLogical(Timestamp.SCHEMA, orig);
|
||||
return switch (config.unixPrecision) {
|
||||
case UNIX_PRECISION_SECONDS -> TimeUnit.MILLISECONDS.toSeconds(unixTimeMillis);
|
||||
case UNIX_PRECISION_MICROS -> TimeUnit.MILLISECONDS.toMicros(unixTimeMillis);
|
||||
case UNIX_PRECISION_NANOS -> TimeUnit.MILLISECONDS.toNanos(unixTimeMillis);
|
||||
default -> unixTimeMillis;
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ import org.apache.kafka.connect.data.Schema;
|
|||
import org.apache.kafka.connect.data.Struct;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
@ -53,7 +52,7 @@ public class SingleFieldPath {
|
|||
this.version = version;
|
||||
switch (version) {
|
||||
case V1: // backward compatibility
|
||||
this.steps = Collections.singletonList(pathText);
|
||||
this.steps = List.of(pathText);
|
||||
break;
|
||||
case V2:
|
||||
this.steps = buildFieldPathV2(pathText);
|
||||
|
@ -134,7 +133,7 @@ public class SingleFieldPath {
|
|||
// add last step if last char is a dot
|
||||
if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT)
|
||||
steps.add("");
|
||||
return Collections.unmodifiableList(steps);
|
||||
return List.copyOf(steps);
|
||||
}
|
||||
|
||||
private static void failWhenIncompleteBacktickPair(String path, int backtickAt) {
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.junit.jupiter.params.provider.MethodSource;
|
|||
import java.math.BigDecimal;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -72,38 +71,38 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void testConfigEmpty() {
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "")));
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Map.of(Cast.SPEC_CONFIG, "")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigInvalidSchemaType() {
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:faketype")));
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:faketype")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigInvalidTargetType() {
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:array")));
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "array")));
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:array")));
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Map.of(Cast.SPEC_CONFIG, "array")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnsupportedTargetType() {
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:bytes")));
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:bytes")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigInvalidMap() {
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8:extra")));
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:int8:extra")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigMixWholeAndFieldTransformation() {
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int8,int32")));
|
||||
assertThrows(ConfigException.class, () -> xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:int8,int32")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void castNullValueRecordWithSchema() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "foo:int64"));
|
||||
SourceRecord original = new SourceRecord(null, null, "topic", 0,
|
||||
Schema.STRING_SCHEMA, "key", Schema.STRING_SCHEMA, null);
|
||||
SourceRecord transformed = xformValue.apply(original);
|
||||
|
@ -129,7 +128,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castNullValueRecordSchemaless() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "foo:int64"));
|
||||
SourceRecord original = new SourceRecord(null, null, "topic", 0,
|
||||
Schema.STRING_SCHEMA, "key", null, null);
|
||||
SourceRecord transformed = xformValue.apply(original);
|
||||
|
@ -138,7 +137,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castNullKeyRecordWithSchema() {
|
||||
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
|
||||
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:int64"));
|
||||
SourceRecord original = new SourceRecord(null, null, "topic", 0,
|
||||
Schema.STRING_SCHEMA, null, Schema.STRING_SCHEMA, "value");
|
||||
SourceRecord transformed = xformKey.apply(original);
|
||||
|
@ -147,7 +146,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castNullKeyRecordSchemaless() {
|
||||
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "foo:int64"));
|
||||
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "foo:int64"));
|
||||
SourceRecord original = new SourceRecord(null, null, "topic", 0,
|
||||
null, null, Schema.STRING_SCHEMA, "value");
|
||||
SourceRecord transformed = xformKey.apply(original);
|
||||
|
@ -156,7 +155,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordKeyWithSchema() {
|
||||
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
|
||||
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "int8"));
|
||||
SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Schema.INT32_SCHEMA, 42, Schema.STRING_SCHEMA, "bogus"));
|
||||
|
||||
|
@ -166,7 +165,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueWithSchemaInt8() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int8"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Schema.INT32_SCHEMA, 42));
|
||||
|
||||
|
@ -176,7 +175,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueWithSchemaInt16() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int16"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Schema.INT32_SCHEMA, 42));
|
||||
|
||||
|
@ -186,7 +185,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueWithSchemaInt32() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int32"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Schema.INT32_SCHEMA, 42));
|
||||
|
||||
|
@ -196,7 +195,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueWithSchemaInt64() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int64"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Schema.INT32_SCHEMA, 42));
|
||||
|
||||
|
@ -206,7 +205,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueWithSchemaFloat32() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "float32"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Schema.INT32_SCHEMA, 42));
|
||||
|
||||
|
@ -216,7 +215,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueWithSchemaFloat64() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "float64"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Schema.INT32_SCHEMA, 42));
|
||||
|
||||
|
@ -226,7 +225,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueWithSchemaBooleanTrue() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "boolean"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Schema.INT32_SCHEMA, 42));
|
||||
|
||||
|
@ -236,7 +235,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueWithSchemaBooleanFalse() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "boolean"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Schema.INT32_SCHEMA, 0));
|
||||
|
||||
|
@ -246,7 +245,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueWithSchemaString() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "string"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Schema.INT32_SCHEMA, 42));
|
||||
|
||||
|
@ -257,7 +256,7 @@ public class CastTest {
|
|||
@Test
|
||||
public void castWholeBigDecimalRecordValueWithSchemaString() {
|
||||
BigDecimal bigDecimal = new BigDecimal(42);
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "string"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Decimal.schema(bigDecimal.scale()), bigDecimal));
|
||||
|
||||
|
@ -268,7 +267,7 @@ public class CastTest {
|
|||
@Test
|
||||
public void castWholeDateRecordValueWithSchemaString() {
|
||||
Date timestamp = new Date(MILLIS_PER_DAY + 1); // day + 1msec to get a timestamp formatting.
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "string"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
Timestamp.SCHEMA, timestamp));
|
||||
|
||||
|
@ -279,7 +278,7 @@ public class CastTest {
|
|||
@Test
|
||||
public void castWholeRecordDefaultValue() {
|
||||
// Validate default value in schema is correctly converted
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int32"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
SchemaBuilder.float32().defaultValue(-42.125f).build(), 42.125f));
|
||||
|
||||
|
@ -290,7 +289,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordKeySchemaless() {
|
||||
xformKey.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
|
||||
xformKey.configure(Map.of(Cast.SPEC_CONFIG, "int8"));
|
||||
SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, 42, Schema.STRING_SCHEMA, "bogus"));
|
||||
|
||||
|
@ -300,7 +299,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueSchemalessInt8() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int8"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, 42));
|
||||
|
||||
|
@ -310,7 +309,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueSchemalessInt16() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int16"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int16"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, 42));
|
||||
|
||||
|
@ -320,7 +319,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueSchemalessInt32() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int32"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int32"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, 42));
|
||||
|
||||
|
@ -330,7 +329,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueSchemalessInt64() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int64"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int64"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, 42));
|
||||
|
||||
|
@ -340,7 +339,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueSchemalessFloat32() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float32"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "float32"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, 42));
|
||||
|
||||
|
@ -350,7 +349,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueSchemalessFloat64() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "float64"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "float64"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, 42));
|
||||
|
||||
|
@ -360,7 +359,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueSchemalessBooleanTrue() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "boolean"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, 42));
|
||||
|
||||
|
@ -370,7 +369,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueSchemalessBooleanFalse() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "boolean"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "boolean"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, 0));
|
||||
|
||||
|
@ -380,7 +379,7 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueSchemalessString() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "string"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "string"));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, 42));
|
||||
|
||||
|
@ -390,15 +389,15 @@ public class CastTest {
|
|||
|
||||
@Test
|
||||
public void castWholeRecordValueSchemalessUnsupportedType() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int8"));
|
||||
assertThrows(DataException.class,
|
||||
() -> xformValue.apply(new SourceRecord(null, null, "topic", 0,
|
||||
null, Collections.singletonList("foo"))));
|
||||
null, List.of("foo"))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void castLogicalToPrimitive() {
|
||||
List<String> specParts = Arrays.asList(
|
||||
List<String> specParts = List.of(
|
||||
"date_to_int32:int32", // Cast to underlying representation
|
||||
"timestamp_to_int64:int64", // Cast to underlying representation
|
||||
"time_to_int64:int64", // Cast to wider datatype than underlying representation
|
||||
|
@ -408,7 +407,7 @@ public class CastTest {
|
|||
);
|
||||
|
||||
Date day = new Date(MILLIS_PER_DAY);
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG,
|
||||
String.join(",", specParts)));
|
||||
|
||||
SchemaBuilder builder = SchemaBuilder.struct();
|
||||
|
@ -455,7 +454,7 @@ public class CastTest {
|
|||
Date time = new Date(MILLIS_PER_HOUR);
|
||||
Date timestamp = new Date();
|
||||
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG,
|
||||
"date:string,decimal:string,time:string,timestamp:string"));
|
||||
|
||||
SchemaBuilder builder = SchemaBuilder.struct();
|
||||
|
@ -494,7 +493,7 @@ public class CastTest {
|
|||
byte[] byteArray = new byte[] {(byte) 0xFE, (byte) 0xDC, (byte) 0xBA, (byte) 0x98, 0x76, 0x54, 0x32, 0x10};
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(Arrays.copyOf(byteArray, byteArray.length));
|
||||
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG,
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG,
|
||||
"int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32,bigdecimal:string,date:string,optional:int32,bytes:string,byteArray:string"));
|
||||
|
||||
// Include an optional fields and fields with defaults to validate their values are passed through properly
|
||||
|
@ -578,7 +577,7 @@ public class CastTest {
|
|||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void castFieldsSchemaless() {
|
||||
xformValue.configure(Collections.singletonMap(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32"));
|
||||
xformValue.configure(Map.of(Cast.SPEC_CONFIG, "int8:int16,int16:int32,int32:int64,int64:boolean,float32:float64,float64:boolean,boolean:int8,string:int32"));
|
||||
Map<String, Object> recordValue = new HashMap<>();
|
||||
recordValue.put("int8", (byte) 8);
|
||||
recordValue.put("int16", (short) 16);
|
||||
|
|
|
@ -25,10 +25,9 @@ import org.apache.kafka.connect.source.SourceRecord;
|
|||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
|
@ -38,7 +37,7 @@ public class DropHeadersTest {
|
|||
|
||||
private Map<String, ?> config(String... headers) {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
result.put(DropHeaders.HEADERS_FIELD, asList(headers));
|
||||
result.put(DropHeaders.HEADERS_FIELD, List.of(headers));
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -106,8 +105,8 @@ public class DropHeadersTest {
|
|||
}
|
||||
|
||||
private SourceRecord sourceRecord(ConnectHeaders headers) {
|
||||
Map<String, ?> sourcePartition = singletonMap("foo", "bar");
|
||||
Map<String, ?> sourceOffset = singletonMap("baz", "quxx");
|
||||
Map<String, ?> sourcePartition = Map.of("foo", "bar");
|
||||
Map<String, ?> sourceOffset = Map.of("baz", "quxx");
|
||||
String topic = "topic";
|
||||
Integer partition = 0;
|
||||
Schema keySchema = null;
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.junit.jupiter.params.ParameterizedTest;
|
|||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -57,9 +56,9 @@ public class ExtractFieldTest {
|
|||
|
||||
@Test
|
||||
public void schemaless() {
|
||||
xformKey.configure(Collections.singletonMap("field", "magic"));
|
||||
xformKey.configure(Map.of("field", "magic"));
|
||||
|
||||
final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0);
|
||||
final SinkRecord record = new SinkRecord("test", 0, null, Map.of("magic", 42), null, null, 0);
|
||||
final SinkRecord transformedRecord = xformKey.apply(record);
|
||||
|
||||
assertNull(transformedRecord.keySchema());
|
||||
|
@ -73,7 +72,7 @@ public class ExtractFieldTest {
|
|||
configs.put("field", "magic.foo");
|
||||
xformKey.configure(configs);
|
||||
|
||||
final Map<String, Object> key = Collections.singletonMap("magic", Collections.singletonMap("foo", 42));
|
||||
final Map<String, Object> key = Map.of("magic", Map.of("foo", 42));
|
||||
final SinkRecord record = new SinkRecord("test", 0, null, key, null, null, 0);
|
||||
final SinkRecord transformedRecord = xformKey.apply(record);
|
||||
|
||||
|
@ -83,7 +82,7 @@ public class ExtractFieldTest {
|
|||
|
||||
@Test
|
||||
public void nullSchemaless() {
|
||||
xformKey.configure(Collections.singletonMap("field", "magic"));
|
||||
xformKey.configure(Map.of("field", "magic"));
|
||||
|
||||
final Map<String, Object> key = null;
|
||||
final SinkRecord record = new SinkRecord("test", 0, null, key, null, null, 0);
|
||||
|
@ -95,7 +94,7 @@ public class ExtractFieldTest {
|
|||
|
||||
@Test
|
||||
public void withSchema() {
|
||||
xformKey.configure(Collections.singletonMap("field", "magic"));
|
||||
xformKey.configure(Map.of("field", "magic"));
|
||||
|
||||
final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build();
|
||||
final Struct key = new Struct(keySchema).put("magic", 42);
|
||||
|
@ -125,7 +124,7 @@ public class ExtractFieldTest {
|
|||
|
||||
@Test
|
||||
public void testNullWithSchema() {
|
||||
xformKey.configure(Collections.singletonMap("field", "magic"));
|
||||
xformKey.configure(Map.of("field", "magic"));
|
||||
|
||||
final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).optional().build();
|
||||
final Struct key = null;
|
||||
|
@ -138,9 +137,9 @@ public class ExtractFieldTest {
|
|||
|
||||
@Test
|
||||
public void nonExistentFieldSchemalessShouldReturnNull() {
|
||||
xformKey.configure(Collections.singletonMap("field", "nonexistent"));
|
||||
xformKey.configure(Map.of("field", "nonexistent"));
|
||||
|
||||
final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0);
|
||||
final SinkRecord record = new SinkRecord("test", 0, null, Map.of("magic", 42), null, null, 0);
|
||||
final SinkRecord transformedRecord = xformKey.apply(record);
|
||||
|
||||
assertNull(transformedRecord.keySchema());
|
||||
|
@ -154,7 +153,7 @@ public class ExtractFieldTest {
|
|||
configs.put("field", "magic.nonexistent");
|
||||
xformKey.configure(configs);
|
||||
|
||||
final Map<String, Object> key = Collections.singletonMap("magic", Collections.singletonMap("foo", 42));
|
||||
final Map<String, Object> key = Map.of("magic", Map.of("foo", 42));
|
||||
final SinkRecord record = new SinkRecord("test", 0, null, key, null, null, 0);
|
||||
final SinkRecord transformedRecord = xformKey.apply(record);
|
||||
|
||||
|
@ -164,7 +163,7 @@ public class ExtractFieldTest {
|
|||
|
||||
@Test
|
||||
public void nonExistentFieldWithSchemaShouldFail() {
|
||||
xformKey.configure(Collections.singletonMap("field", "nonexistent"));
|
||||
xformKey.configure(Map.of("field", "nonexistent"));
|
||||
|
||||
final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build();
|
||||
final Struct key = new Struct(keySchema).put("magic", 42);
|
||||
|
|
|
@ -27,10 +27,9 @@ import org.apache.kafka.connect.source.SourceRecord;
|
|||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
|
@ -52,21 +51,21 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void topLevelStructRequired() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
assertThrows(DataException.class, () -> xformValue.apply(new SourceRecord(null, null,
|
||||
"topic", 0, Schema.INT32_SCHEMA, 42)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void topLevelMapRequired() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
assertThrows(DataException.class, () -> xformValue.apply(new SourceRecord(null, null,
|
||||
"topic", 0, null, 42)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedStruct() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
|
||||
SchemaBuilder builder = SchemaBuilder.struct();
|
||||
builder.field("int8", Schema.INT8_SCHEMA);
|
||||
|
@ -125,7 +124,7 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void testNestedMapWithDelimiter() {
|
||||
xformValue.configure(Collections.singletonMap("delimiter", "#"));
|
||||
xformValue.configure(Map.of("delimiter", "#"));
|
||||
|
||||
Map<String, Object> supportedTypes = new HashMap<>();
|
||||
supportedTypes.put("int8", (byte) 8);
|
||||
|
@ -138,8 +137,8 @@ public class FlattenTest {
|
|||
supportedTypes.put("string", "stringy");
|
||||
supportedTypes.put("bytes", "bytes".getBytes());
|
||||
|
||||
Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", supportedTypes);
|
||||
Map<String, Object> twoLevelNestedMap = Collections.singletonMap("A", oneLevelNestedMap);
|
||||
Map<String, Object> oneLevelNestedMap = Map.of("B", supportedTypes);
|
||||
Map<String, Object> twoLevelNestedMap = Map.of("A", oneLevelNestedMap);
|
||||
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
|
||||
"topic", 0,
|
||||
|
@ -163,7 +162,7 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void testOptionalFieldStruct() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
|
||||
SchemaBuilder builder = SchemaBuilder.struct();
|
||||
builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
|
||||
|
@ -190,7 +189,7 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void testOptionalStruct() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
|
||||
SchemaBuilder builder = SchemaBuilder.struct().optional();
|
||||
builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
|
||||
|
@ -206,7 +205,7 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void testOptionalNestedStruct() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
|
||||
SchemaBuilder builder = SchemaBuilder.struct().optional();
|
||||
builder.field("opt_int32", Schema.OPTIONAL_INT32_SCHEMA);
|
||||
|
@ -230,12 +229,12 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void testOptionalFieldMap() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
|
||||
Map<String, Object> supportedTypes = new HashMap<>();
|
||||
supportedTypes.put("opt_int32", null);
|
||||
|
||||
Map<String, Object> oneLevelNestedMap = Collections.singletonMap("B", supportedTypes);
|
||||
Map<String, Object> oneLevelNestedMap = Map.of("B", supportedTypes);
|
||||
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null,
|
||||
"topic", 0,
|
||||
|
@ -251,9 +250,9 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void testKey() {
|
||||
xformKey.configure(Collections.emptyMap());
|
||||
xformKey.configure(Map.of());
|
||||
|
||||
Map<String, Map<String, Integer>> key = Collections.singletonMap("A", Collections.singletonMap("B", 12));
|
||||
Map<String, Map<String, Integer>> key = Map.of("A", Map.of("B", 12));
|
||||
SourceRecord src = new SourceRecord(null, null, "topic", null, key, null, null);
|
||||
SourceRecord transformed = xformKey.apply(src);
|
||||
|
||||
|
@ -266,14 +265,14 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void testSchemalessArray() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
Object value = Collections.singletonMap("foo", Arrays.asList("bar", Collections.singletonMap("baz", Collections.singletonMap("lfg", "lfg"))));
|
||||
xformValue.configure(Map.of());
|
||||
Object value = Map.of("foo", List.of("bar", Map.of("baz", Map.of("lfg", "lfg"))));
|
||||
assertEquals(value, xformValue.apply(new SourceRecord(null, null, "topic", null, null, null, value)).value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayWithSchema() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
Schema nestedStructSchema = SchemaBuilder.struct().field("lfg", Schema.STRING_SCHEMA).build();
|
||||
Schema innerStructSchema = SchemaBuilder.struct().field("baz", nestedStructSchema).build();
|
||||
Schema structSchema = SchemaBuilder.struct()
|
||||
|
@ -284,7 +283,7 @@ public class FlattenTest {
|
|||
Struct innerValue = new Struct(innerStructSchema);
|
||||
innerValue.put("baz", nestedValue);
|
||||
Struct value = new Struct(structSchema);
|
||||
value.put("foo", Collections.singletonList(innerValue));
|
||||
value.put("foo", List.of(innerValue));
|
||||
SourceRecord transformed = xformValue.apply(new SourceRecord(null, null, "topic", null, null, structSchema, value));
|
||||
assertEquals(value, transformed.value());
|
||||
assertEquals(structSchema, transformed.valueSchema());
|
||||
|
@ -296,7 +295,7 @@ public class FlattenTest {
|
|||
// children should also be optional. Similarly, if the parent Struct has a default value, the default value for
|
||||
// the flattened field
|
||||
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
|
||||
SchemaBuilder builder = SchemaBuilder.struct().optional();
|
||||
builder.field("req_field", Schema.STRING_SCHEMA);
|
||||
|
@ -325,7 +324,7 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void tombstoneEventWithoutSchemaShouldPassThrough() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
|
||||
final SourceRecord record = new SourceRecord(null, null, "test", 0,
|
||||
null, null);
|
||||
|
@ -337,7 +336,7 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void tombstoneEventWithSchemaShouldPassThrough() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
|
||||
final Schema simpleStructSchema = SchemaBuilder.struct().name("name").version(1).doc("doc").field("magic", Schema.OPTIONAL_INT64_SCHEMA).build();
|
||||
final SourceRecord record = new SourceRecord(null, null, "test", 0,
|
||||
|
@ -350,7 +349,7 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void testMapWithNullFields() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
|
||||
// Use a LinkedHashMap to ensure the SMT sees entries in a specific order
|
||||
Map<String, Object> value = new LinkedHashMap<>();
|
||||
|
@ -368,7 +367,7 @@ public class FlattenTest {
|
|||
|
||||
@Test
|
||||
public void testStructWithNullFields() {
|
||||
xformValue.configure(Collections.emptyMap());
|
||||
xformValue.configure(Map.of());
|
||||
|
||||
final Schema structSchema = SchemaBuilder.struct()
|
||||
.field("firstNull", Schema.OPTIONAL_STRING_SCHEMA)
|
||||
|
|
|
@ -36,10 +36,6 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Arrays.asList;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
@ -103,8 +99,8 @@ public class HeaderFromTest {
|
|||
}
|
||||
|
||||
private SourceRecord sourceRecord(boolean keyTransform, Schema keyOrValueSchema, Object keyOrValue) {
|
||||
Map<String, ?> sourcePartition = singletonMap("foo", "bar");
|
||||
Map<String, ?> sourceOffset = singletonMap("baz", "quxx");
|
||||
Map<String, ?> sourcePartition = Map.of("foo", "bar");
|
||||
Map<String, ?> sourceOffset = Map.of("baz", "quxx");
|
||||
String topic = "topic";
|
||||
Integer partition = 0;
|
||||
Long timestamp = 0L;
|
||||
|
@ -140,7 +136,7 @@ public class HeaderFromTest {
|
|||
|
||||
List<Arguments> result = new ArrayList<>();
|
||||
|
||||
for (Boolean testKeyTransform : asList(true, false)) {
|
||||
for (Boolean testKeyTransform : List.of(true, false)) {
|
||||
result.add(
|
||||
Arguments.of(
|
||||
"basic copy",
|
||||
|
@ -149,7 +145,7 @@ public class HeaderFromTest {
|
|||
.withField("field1", STRING_SCHEMA, "field1-value")
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
.addHeader("header1", STRING_SCHEMA, "existing-value"),
|
||||
singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY, true,
|
||||
List.of("field1"), List.of("inserted1"), HeaderFrom.Operation.COPY, true,
|
||||
new RecordBuilder()
|
||||
.withField("field1", STRING_SCHEMA, "field1-value")
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
|
@ -164,7 +160,7 @@ public class HeaderFromTest {
|
|||
.withField("field1", STRING_SCHEMA, "field1-value")
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
.addHeader("header1", STRING_SCHEMA, "existing-value"),
|
||||
singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE, true,
|
||||
List.of("field1"), List.of("inserted1"), HeaderFrom.Operation.MOVE, true,
|
||||
new RecordBuilder()
|
||||
// field1 got moved
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
|
@ -179,7 +175,7 @@ public class HeaderFromTest {
|
|||
.withField("field1", STRING_SCHEMA, "field1-value")
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
.addHeader("inserted1", STRING_SCHEMA, "existing-value"),
|
||||
singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY, true,
|
||||
List.of("field1"), List.of("inserted1"), HeaderFrom.Operation.COPY, true,
|
||||
new RecordBuilder()
|
||||
.withField("field1", STRING_SCHEMA, "field1-value")
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
|
@ -194,7 +190,7 @@ public class HeaderFromTest {
|
|||
.withField("field1", STRING_SCHEMA, "field1-value")
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
.addHeader("inserted1", STRING_SCHEMA, "existing-value"),
|
||||
singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE, true,
|
||||
List.of("field1"), List.of("inserted1"), HeaderFrom.Operation.MOVE, true,
|
||||
new RecordBuilder()
|
||||
// field1 got moved
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
|
@ -211,7 +207,7 @@ public class HeaderFromTest {
|
|||
.withField("field1", schema, struct)
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
.addHeader("header1", STRING_SCHEMA, "existing-value"),
|
||||
singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY, true,
|
||||
List.of("field1"), List.of("inserted1"), HeaderFrom.Operation.COPY, true,
|
||||
new RecordBuilder()
|
||||
.withField("field1", schema, struct)
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
|
@ -226,7 +222,7 @@ public class HeaderFromTest {
|
|||
.withField("field1", schema, struct)
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
.addHeader("header1", STRING_SCHEMA, "existing-value"),
|
||||
singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE, true,
|
||||
List.of("field1"), List.of("inserted1"), HeaderFrom.Operation.MOVE, true,
|
||||
new RecordBuilder()
|
||||
// field1 got moved
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
|
@ -242,7 +238,7 @@ public class HeaderFromTest {
|
|||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
.addHeader("header1", STRING_SCHEMA, "existing-value"),
|
||||
// two headers from the same field
|
||||
asList("field1", "field1"), asList("inserted1", "inserted2"), HeaderFrom.Operation.MOVE, true,
|
||||
List.of("field1", "field1"), List.of("inserted1", "inserted2"), HeaderFrom.Operation.MOVE, true,
|
||||
new RecordBuilder()
|
||||
// field1 got moved
|
||||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
|
@ -259,7 +255,7 @@ public class HeaderFromTest {
|
|||
.withField("field2", STRING_SCHEMA, "field2-value")
|
||||
.addHeader("header1", STRING_SCHEMA, "existing-value"),
|
||||
// two headers from the same field
|
||||
asList("field1", "field2"), asList("inserted1", "inserted1"), HeaderFrom.Operation.MOVE, true,
|
||||
List.of("field1", "field2"), List.of("inserted1", "inserted1"), HeaderFrom.Operation.MOVE, true,
|
||||
new RecordBuilder()
|
||||
// field1 and field2 got moved
|
||||
.addHeader("header1", STRING_SCHEMA, "existing-value")
|
||||
|
@ -274,7 +270,7 @@ public class HeaderFromTest {
|
|||
.withField("field1", SchemaBuilder.string().defaultValue("default").optional().build(), "field1-value")
|
||||
.withField("field2", SchemaBuilder.string().defaultValue("default").optional().build(), null)
|
||||
.addHeader("header1", STRING_SCHEMA, "existing-value"),
|
||||
asList("field1", "field2"), asList("inserted1", "inserted2"), HeaderFrom.Operation.COPY, false,
|
||||
List.of("field1", "field2"), List.of("inserted1", "inserted2"), HeaderFrom.Operation.COPY, false,
|
||||
new RecordBuilder()
|
||||
.withField("field1", SchemaBuilder.string().defaultValue("default").optional().build(), "field1-value")
|
||||
.withField("field2", SchemaBuilder.string().defaultValue("default").optional().build(), null)
|
||||
|
@ -290,7 +286,7 @@ public class HeaderFromTest {
|
|||
.withField("field1", SchemaBuilder.string().defaultValue("default").optional().build(), "field1-value")
|
||||
.withField("field2", SchemaBuilder.string().defaultValue("default").optional().build(), null)
|
||||
.addHeader("header1", STRING_SCHEMA, "existing-value"),
|
||||
asList("field1", "field2"), asList("inserted1", "inserted2"), HeaderFrom.Operation.MOVE, false,
|
||||
List.of("field1", "field2"), List.of("inserted1", "inserted2"), HeaderFrom.Operation.MOVE, false,
|
||||
new RecordBuilder()
|
||||
.addHeader("header1", STRING_SCHEMA, "existing-value")
|
||||
.addHeader("inserted1", SchemaBuilder.string().defaultValue("default").optional().build(), "field1-value")
|
||||
|
@ -353,7 +349,7 @@ public class HeaderFromTest {
|
|||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void invalidConfigExtraHeaderConfig(boolean keyTransform) {
|
||||
Map<String, Object> config = config(singletonList("foo"), asList("foo", "bar"), HeaderFrom.Operation.COPY, true);
|
||||
Map<String, Object> config = config(List.of("foo"), List.of("foo", "bar"), HeaderFrom.Operation.COPY, true);
|
||||
HeaderFrom<?> xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>();
|
||||
assertThrows(ConfigException.class, () -> xform.configure(config));
|
||||
}
|
||||
|
@ -361,7 +357,7 @@ public class HeaderFromTest {
|
|||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void invalidConfigExtraFieldConfig(boolean keyTransform) {
|
||||
Map<String, Object> config = config(asList("foo", "bar"), singletonList("foo"), HeaderFrom.Operation.COPY, true);
|
||||
Map<String, Object> config = config(List.of("foo", "bar"), List.of("foo"), HeaderFrom.Operation.COPY, true);
|
||||
HeaderFrom<?> xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>();
|
||||
assertThrows(ConfigException.class, () -> xform.configure(config));
|
||||
}
|
||||
|
@ -369,7 +365,7 @@ public class HeaderFromTest {
|
|||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void invalidConfigEmptyHeadersAndFieldsConfig(boolean keyTransform) {
|
||||
Map<String, Object> config = config(emptyList(), emptyList(), HeaderFrom.Operation.COPY, true);
|
||||
Map<String, Object> config = config(List.of(), List.of(), HeaderFrom.Operation.COPY, true);
|
||||
HeaderFrom<?> xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>();
|
||||
assertThrows(ConfigException.class, () -> xform.configure(config));
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.kafka.connect.sink.SinkRecord;
|
|||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -41,18 +40,18 @@ public class HoistFieldTest {
|
|||
|
||||
@Test
|
||||
public void schemaless() {
|
||||
xform.configure(Collections.singletonMap("field", "magic"));
|
||||
xform.configure(Map.of("field", "magic"));
|
||||
|
||||
final SinkRecord record = new SinkRecord("test", 0, null, 42, null, null, 0);
|
||||
final SinkRecord transformedRecord = xform.apply(record);
|
||||
|
||||
assertNull(transformedRecord.keySchema());
|
||||
assertEquals(Collections.singletonMap("magic", 42), transformedRecord.key());
|
||||
assertEquals(Map.of("magic", 42), transformedRecord.key());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void withSchema() {
|
||||
xform.configure(Collections.singletonMap("field", "magic"));
|
||||
xform.configure(Map.of("field", "magic"));
|
||||
|
||||
final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null, null, 0);
|
||||
final SinkRecord transformedRecord = xform.apply(record);
|
||||
|
@ -64,7 +63,7 @@ public class HoistFieldTest {
|
|||
|
||||
@Test
|
||||
public void testSchemalessMapIsMutable() {
|
||||
xform.configure(Collections.singletonMap("field", "magic"));
|
||||
xform.configure(Map.of("field", "magic"));
|
||||
|
||||
final SinkRecord record = new SinkRecord("test", 0, null, 420, null, null, 0);
|
||||
final SinkRecord transformedRecord = xform.apply(record);
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.junit.jupiter.params.ParameterizedTest;
|
|||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -59,7 +58,7 @@ public class InsertFieldTest {
|
|||
|
||||
@Test
|
||||
public void topLevelStructRequired() {
|
||||
xformValue.configure(Collections.singletonMap("topic.field", "topic_field"));
|
||||
xformValue.configure(Map.of("topic.field", "topic_field"));
|
||||
assertThrows(DataException.class,
|
||||
() -> xformValue.apply(new SourceRecord(null, null, "", 0, Schema.INT32_SCHEMA, 42)));
|
||||
}
|
||||
|
@ -118,7 +117,7 @@ public class InsertFieldTest {
|
|||
xformValue.configure(props);
|
||||
|
||||
final SourceRecord record = new SourceRecord(null, null, "test", 0,
|
||||
null, null, null, Collections.singletonMap("magic", 42L), 123L);
|
||||
null, null, null, Map.of("magic", 42L), 123L);
|
||||
|
||||
final SourceRecord transformedRecord = xformValue.apply(record);
|
||||
|
||||
|
@ -183,7 +182,7 @@ public class InsertFieldTest {
|
|||
xformKey.configure(props);
|
||||
|
||||
final SourceRecord record = new SourceRecord(null, null, "test", 0,
|
||||
null, Collections.singletonMap("magic", 42L), null, null);
|
||||
null, Map.of("magic", 42L), null, null);
|
||||
|
||||
final SourceRecord transformedRecord = xformKey.apply(record);
|
||||
|
||||
|
@ -207,7 +206,7 @@ public class InsertFieldTest {
|
|||
xformKey.configure(props);
|
||||
|
||||
final SourceRecord record = new SourceRecord(null, null, "test", 0,
|
||||
null, null, null, Collections.singletonMap("magic", 42L));
|
||||
null, null, null, Map.of("magic", 42L));
|
||||
|
||||
final SourceRecord transformedRecord = xformKey.apply(record);
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.junit.jupiter.api.Test;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
|
@ -105,8 +104,8 @@ public class InsertHeaderTest {
|
|||
}
|
||||
|
||||
private SourceRecord sourceRecord(ConnectHeaders headers) {
|
||||
Map<String, ?> sourcePartition = singletonMap("foo", "bar");
|
||||
Map<String, ?> sourceOffset = singletonMap("baz", "quxx");
|
||||
Map<String, ?> sourcePartition = Map.of("foo", "bar");
|
||||
Map<String, ?> sourceOffset = Map.of("baz", "quxx");
|
||||
String topic = "topic";
|
||||
Integer partition = 0;
|
||||
Schema keySchema = null;
|
||||
|
|
|
@ -33,73 +33,69 @@ import org.junit.jupiter.api.Test;
|
|||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class MaskFieldTest {
|
||||
|
||||
private static final Schema SCHEMA = SchemaBuilder.struct()
|
||||
.field("magic", Schema.INT32_SCHEMA)
|
||||
.field("bool", Schema.BOOLEAN_SCHEMA)
|
||||
.field("byte", Schema.INT8_SCHEMA)
|
||||
.field("short", Schema.INT16_SCHEMA)
|
||||
.field("int", Schema.INT32_SCHEMA)
|
||||
.field("long", Schema.INT64_SCHEMA)
|
||||
.field("float", Schema.FLOAT32_SCHEMA)
|
||||
.field("double", Schema.FLOAT64_SCHEMA)
|
||||
.field("string", Schema.STRING_SCHEMA)
|
||||
.field("date", org.apache.kafka.connect.data.Date.SCHEMA)
|
||||
.field("time", Time.SCHEMA)
|
||||
.field("timestamp", Timestamp.SCHEMA)
|
||||
.field("decimal", Decimal.schema(0))
|
||||
.field("array", SchemaBuilder.array(Schema.INT32_SCHEMA))
|
||||
.field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
|
||||
.field("withDefault", SchemaBuilder.string().optional().defaultValue("default").build())
|
||||
.build();
|
||||
private static final Map<String, Object> VALUES = new HashMap<>();
|
||||
private static final Struct VALUES_WITH_SCHEMA = new Struct(SCHEMA);
|
||||
.field("magic", Schema.INT32_SCHEMA)
|
||||
.field("bool", Schema.BOOLEAN_SCHEMA)
|
||||
.field("byte", Schema.INT8_SCHEMA)
|
||||
.field("short", Schema.INT16_SCHEMA)
|
||||
.field("int", Schema.INT32_SCHEMA)
|
||||
.field("long", Schema.INT64_SCHEMA)
|
||||
.field("float", Schema.FLOAT32_SCHEMA)
|
||||
.field("double", Schema.FLOAT64_SCHEMA)
|
||||
.field("string", Schema.STRING_SCHEMA)
|
||||
.field("date", org.apache.kafka.connect.data.Date.SCHEMA)
|
||||
.field("time", Time.SCHEMA)
|
||||
.field("timestamp", Timestamp.SCHEMA)
|
||||
.field("decimal", Decimal.schema(0))
|
||||
.field("array", SchemaBuilder.array(Schema.INT32_SCHEMA))
|
||||
.field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
|
||||
.field("withDefault", SchemaBuilder.string().optional().defaultValue("default").build())
|
||||
.build();
|
||||
|
||||
static {
|
||||
VALUES.put("magic", 42);
|
||||
VALUES.put("bool", true);
|
||||
VALUES.put("byte", (byte) 42);
|
||||
VALUES.put("short", (short) 42);
|
||||
VALUES.put("int", 42);
|
||||
VALUES.put("long", 42L);
|
||||
VALUES.put("float", 42f);
|
||||
VALUES.put("double", 42d);
|
||||
VALUES.put("string", "55.121.20.20");
|
||||
VALUES.put("date", new Date());
|
||||
VALUES.put("bigint", new BigInteger("42"));
|
||||
VALUES.put("bigdec", new BigDecimal("42.0"));
|
||||
VALUES.put("list", singletonList(42));
|
||||
VALUES.put("map", Collections.singletonMap("key", "value"));
|
||||
private static final Map<String, Object> VALUES = Map.ofEntries(
|
||||
Map.entry("magic", 42),
|
||||
Map.entry("bool", true),
|
||||
Map.entry("byte", (byte) 42),
|
||||
Map.entry("short", (short) 42),
|
||||
Map.entry("int", 42),
|
||||
Map.entry("long", 42L),
|
||||
Map.entry("float", 42f),
|
||||
Map.entry("double", 42d),
|
||||
Map.entry("string", "55.121.20.20"),
|
||||
Map.entry("date", new Date()),
|
||||
Map.entry("bigint", new BigInteger("42")),
|
||||
Map.entry("bigdec", new BigDecimal("42.0")),
|
||||
Map.entry("list", List.of(42)),
|
||||
Map.entry("map", Map.of("key", "value"))
|
||||
);
|
||||
|
||||
VALUES_WITH_SCHEMA.put("magic", 42);
|
||||
VALUES_WITH_SCHEMA.put("bool", true);
|
||||
VALUES_WITH_SCHEMA.put("byte", (byte) 42);
|
||||
VALUES_WITH_SCHEMA.put("short", (short) 42);
|
||||
VALUES_WITH_SCHEMA.put("int", 42);
|
||||
VALUES_WITH_SCHEMA.put("long", 42L);
|
||||
VALUES_WITH_SCHEMA.put("float", 42f);
|
||||
VALUES_WITH_SCHEMA.put("double", 42d);
|
||||
VALUES_WITH_SCHEMA.put("string", "hmm");
|
||||
VALUES_WITH_SCHEMA.put("date", new Date());
|
||||
VALUES_WITH_SCHEMA.put("time", new Date());
|
||||
VALUES_WITH_SCHEMA.put("timestamp", new Date());
|
||||
VALUES_WITH_SCHEMA.put("decimal", new BigDecimal(42));
|
||||
VALUES_WITH_SCHEMA.put("array", Arrays.asList(1, 2, 3));
|
||||
VALUES_WITH_SCHEMA.put("map", Collections.singletonMap("what", "what"));
|
||||
VALUES_WITH_SCHEMA.put("withDefault", null);
|
||||
}
|
||||
private static final Struct VALUES_WITH_SCHEMA = new Struct(SCHEMA)
|
||||
.put("magic", 42)
|
||||
.put("bool", true)
|
||||
.put("byte", (byte) 42)
|
||||
.put("short", (short) 42)
|
||||
.put("int", 42)
|
||||
.put("long", 42L)
|
||||
.put("float", 42f)
|
||||
.put("double", 42d)
|
||||
.put("string", "hmm")
|
||||
.put("date", new Date())
|
||||
.put("time", new Date())
|
||||
.put("timestamp", new Date())
|
||||
.put("decimal", new BigDecimal(42))
|
||||
.put("array", List.of(1, 2, 3))
|
||||
.put("map", Map.of("what", "what"))
|
||||
.put("withDefault", null);
|
||||
|
||||
private static MaskField<SinkRecord> transform(List<String> fields, String replacement) {
|
||||
final MaskField<SinkRecord> xform = new MaskField.Value<>();
|
||||
|
@ -117,20 +113,20 @@ public class MaskFieldTest {
|
|||
|
||||
private static void checkReplacementWithSchema(String maskField, Object replacement) {
|
||||
SinkRecord record = record(SCHEMA, VALUES_WITH_SCHEMA);
|
||||
final Struct updatedValue = (Struct) transform(singletonList(maskField), String.valueOf(replacement)).apply(record).value();
|
||||
final Struct updatedValue = (Struct) transform(List.of(maskField), String.valueOf(replacement)).apply(record).value();
|
||||
assertEquals(replacement, updatedValue.get(maskField), "Invalid replacement for " + maskField + " value");
|
||||
}
|
||||
|
||||
private static void checkReplacementSchemaless(String maskField, Object replacement) {
|
||||
checkReplacementSchemaless(singletonList(maskField), replacement);
|
||||
checkReplacementSchemaless(List.of(maskField), replacement);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static void checkReplacementSchemaless(List<String> maskFields, Object replacement) {
|
||||
SinkRecord record = record(null, VALUES);
|
||||
final Map<String, Object> updatedValue = (Map) transform(maskFields, String.valueOf(replacement))
|
||||
.apply(record)
|
||||
.value();
|
||||
.apply(record)
|
||||
.value();
|
||||
for (String maskField : maskFields) {
|
||||
assertEquals(replacement, updatedValue.get(maskField), "Invalid replacement for " + maskField + " value");
|
||||
}
|
||||
|
@ -154,8 +150,8 @@ public class MaskFieldTest {
|
|||
assertEquals(new Date(0), updatedValue.get("date"));
|
||||
assertEquals(BigInteger.ZERO, updatedValue.get("bigint"));
|
||||
assertEquals(BigDecimal.ZERO, updatedValue.get("bigdec"));
|
||||
assertEquals(Collections.emptyList(), updatedValue.get("list"));
|
||||
assertEquals(Collections.emptyMap(), updatedValue.get("map"));
|
||||
assertEquals(List.of(), updatedValue.get("list"));
|
||||
assertEquals(Map.of(), updatedValue.get("map"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -182,8 +178,8 @@ public class MaskFieldTest {
|
|||
assertEquals(new Date(0), updatedValue.get("time"));
|
||||
assertEquals(new Date(0), updatedValue.get("timestamp"));
|
||||
assertEquals(BigDecimal.ZERO, updatedValue.get("decimal"));
|
||||
assertEquals(Collections.emptyList(), updatedValue.get("array"));
|
||||
assertEquals(Collections.emptyMap(), updatedValue.get("map"));
|
||||
assertEquals(List.of(), updatedValue.get("array"));
|
||||
assertEquals(Map.of(), updatedValue.get("map"));
|
||||
assertEquals(null, updatedValue.getWithoutDefault("withDefault"));
|
||||
}
|
||||
|
||||
|
@ -206,10 +202,10 @@ public class MaskFieldTest {
|
|||
Class<DataException> exClass = DataException.class;
|
||||
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless("date", new Date()), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless(Arrays.asList("int", "date"), new Date()), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless(List.of("int", "date"), new Date()), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless("bool", false), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless("list", singletonList("123")), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless("map", Collections.singletonMap("123", "321")), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless("list", List.of("123")), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless("map", Map.of("123", "321")), exMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -231,7 +227,7 @@ public class MaskFieldTest {
|
|||
|
||||
assertThrows(exClass, () -> checkReplacementWithSchema("time", new Date()), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementWithSchema("timestamp", new Date()), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementWithSchema("array", singletonList(123)), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementWithSchema("array", List.of(123)), exMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -249,7 +245,7 @@ public class MaskFieldTest {
|
|||
assertThrows(exClass, () -> checkReplacementSchemaless("bigdec", "foo"), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless("int", new Date()), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless("int", new Object()), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless(Arrays.asList("string", "int"), "foo"), exMessage);
|
||||
assertThrows(exClass, () -> checkReplacementSchemaless(List.of("string", "int"), "foo"), exMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -259,17 +255,17 @@ public class MaskFieldTest {
|
|||
|
||||
@Test
|
||||
public void testNullListAndMapReplacementsAreMutable() {
|
||||
final List<String> maskFields = Arrays.asList("array", "map");
|
||||
final List<String> maskFields = List.of("array", "map");
|
||||
final Struct updatedValue = (Struct) transform(maskFields, null).apply(record(SCHEMA, VALUES_WITH_SCHEMA)).value();
|
||||
@SuppressWarnings("unchecked") List<Integer> actualList = (List<Integer>) updatedValue.get("array");
|
||||
assertEquals(Collections.emptyList(), actualList);
|
||||
assertEquals(List.of(), actualList);
|
||||
actualList.add(0);
|
||||
assertEquals(Collections.singletonList(0), actualList);
|
||||
assertEquals(List.of(0), actualList);
|
||||
|
||||
@SuppressWarnings("unchecked") Map<String, String> actualMap = (Map<String, String>) updatedValue.get("map");
|
||||
assertEquals(Collections.emptyMap(), actualMap);
|
||||
assertEquals(Map.of(), actualMap);
|
||||
actualMap.put("k", "v");
|
||||
assertEquals(Collections.singletonMap("k", "v"), actualMap);
|
||||
assertEquals(Map.of("k", "v"), actualMap);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.junit.jupiter.params.ParameterizedTest;
|
|||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -57,7 +56,7 @@ public class SetSchemaMetadataTest {
|
|||
|
||||
@Test
|
||||
public void schemaNameUpdate() {
|
||||
xform.configure(Collections.singletonMap("schema.name", "foo"));
|
||||
xform.configure(Map.of("schema.name", "foo"));
|
||||
final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
|
||||
final SinkRecord updatedRecord = xform.apply(record);
|
||||
assertEquals("foo", updatedRecord.valueSchema().name());
|
||||
|
@ -65,7 +64,7 @@ public class SetSchemaMetadataTest {
|
|||
|
||||
@Test
|
||||
public void schemaVersionUpdate() {
|
||||
xform.configure(Collections.singletonMap("schema.version", 42));
|
||||
xform.configure(Map.of("schema.version", 42));
|
||||
final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0);
|
||||
final SinkRecord updatedRecord = xform.apply(record);
|
||||
assertEquals(42, updatedRecord.valueSchema().version());
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.junit.jupiter.params.provider.Arguments;
|
|||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.GregorianCalendar;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -108,13 +107,13 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testConfigNoTargetType() {
|
||||
assertThrows(ConfigException.class, () -> xformValue.configure(Collections.emptyMap()));
|
||||
assertThrows(ConfigException.class, () -> xformValue.configure(Map.of()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigInvalidTargetType() {
|
||||
assertThrows(ConfigException.class,
|
||||
() -> xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid")));
|
||||
() -> xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "invalid")));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -136,7 +135,7 @@ public class TimestampConverterTest {
|
|||
@Test
|
||||
public void testConfigMissingFormat() {
|
||||
assertThrows(ConfigException.class,
|
||||
() -> xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "string")));
|
||||
() -> xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "string")));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -151,7 +150,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testSchemalessIdentity() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
|
||||
|
||||
assertNull(transformed.valueSchema());
|
||||
|
@ -160,7 +159,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testSchemalessTimestampToDate() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
|
||||
|
||||
assertNull(transformed.valueSchema());
|
||||
|
@ -169,7 +168,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testSchemalessTimestampToTime() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
|
||||
|
||||
assertNull(transformed.valueSchema());
|
||||
|
@ -178,7 +177,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testSchemalessTimestampToUnix() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
|
||||
|
||||
assertNull(transformed.valueSchema());
|
||||
|
@ -202,7 +201,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testSchemalessDateToTimestamp() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE.getTime()));
|
||||
|
||||
assertNull(transformed.valueSchema());
|
||||
|
@ -212,7 +211,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testSchemalessTimeToTimestamp() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordSchemaless(TIME.getTime()));
|
||||
|
||||
assertNull(transformed.valueSchema());
|
||||
|
@ -222,7 +221,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testSchemalessUnixToTimestamp() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_UNIX));
|
||||
|
||||
assertNull(transformed.valueSchema());
|
||||
|
@ -246,7 +245,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testWithSchemaIdentity() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
|
||||
|
||||
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
|
||||
|
@ -255,7 +254,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testWithSchemaTimestampToDate() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
|
||||
|
||||
assertEquals(Date.SCHEMA, transformed.valueSchema());
|
||||
|
@ -264,7 +263,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testWithSchemaTimestampToTime() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
|
||||
|
||||
assertEquals(Time.SCHEMA, transformed.valueSchema());
|
||||
|
@ -273,7 +272,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testWithSchemaTimestampToUnix() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
|
||||
|
||||
assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
|
||||
|
@ -348,7 +347,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testWithSchemaDateToTimestamp() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Date.SCHEMA, DATE.getTime()));
|
||||
|
||||
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
|
||||
|
@ -358,7 +357,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testWithSchemaTimeToTimestamp() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Time.SCHEMA, TIME.getTime()));
|
||||
|
||||
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
|
||||
|
@ -368,7 +367,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testWithSchemaUnixToTimestamp() {
|
||||
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
xformValue.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
|
||||
|
||||
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
|
||||
|
@ -530,11 +529,11 @@ public class TimestampConverterTest {
|
|||
config.put(TimestampConverter.FIELD_CONFIG, "ts");
|
||||
xformValue.configure(config);
|
||||
|
||||
Object value = Collections.singletonMap("ts", DATE_PLUS_TIME.getTime());
|
||||
Object value = Map.of("ts", DATE_PLUS_TIME.getTime());
|
||||
SourceRecord transformed = xformValue.apply(createRecordSchemaless(value));
|
||||
|
||||
assertNull(transformed.valueSchema());
|
||||
assertEquals(Collections.singletonMap("ts", DATE.getTime()), transformed.value());
|
||||
assertEquals(Map.of("ts", DATE.getTime()), transformed.value());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -590,7 +589,7 @@ public class TimestampConverterTest {
|
|||
.build();
|
||||
|
||||
assertEquals(expectedSchema, transformed.valueSchema());
|
||||
assertEquals(null, ((Struct) transformed.value()).get("ts"));
|
||||
assertNull(((Struct) transformed.value()).get("ts"));
|
||||
assertEquals("test", ((Struct) transformed.value()).get("other"));
|
||||
}
|
||||
|
||||
|
@ -716,7 +715,7 @@ public class TimestampConverterTest {
|
|||
|
||||
@Test
|
||||
public void testKey() {
|
||||
xformKey.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
xformKey.configure(Map.of(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
|
||||
SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
|
||||
|
||||
assertNull(transformed.keySchema());
|
||||
|
|
|
@ -23,7 +23,7 @@ import org.junit.jupiter.api.AfterEach;
|
|||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
|
@ -32,7 +32,7 @@ public class TimestampRouterTest {
|
|||
@BeforeEach
|
||||
public void setup() {
|
||||
xform = new TimestampRouter<>();
|
||||
xform.configure(Collections.emptyMap()); // defaults
|
||||
xform.configure(Map.of()); // defaults
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.junit.jupiter.params.ParameterizedTest;
|
|||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -55,7 +54,7 @@ public class ValueToKeyTest {
|
|||
|
||||
@Test
|
||||
public void schemaless() {
|
||||
xform.configure(Collections.singletonMap("fields", "a,b"));
|
||||
xform.configure(Map.of("fields", "a,b"));
|
||||
|
||||
final HashMap<String, Integer> value = new HashMap<>();
|
||||
value.put("a", 1);
|
||||
|
@ -75,7 +74,7 @@ public class ValueToKeyTest {
|
|||
|
||||
@Test
|
||||
public void withSchema() {
|
||||
xform.configure(Collections.singletonMap("fields", "a,b"));
|
||||
xform.configure(Map.of("fields", "a,b"));
|
||||
|
||||
final Schema valueSchema = SchemaBuilder.struct()
|
||||
.field("a", Schema.INT32_SCHEMA)
|
||||
|
@ -106,7 +105,7 @@ public class ValueToKeyTest {
|
|||
|
||||
@Test
|
||||
public void nonExistingField() {
|
||||
xform.configure(Collections.singletonMap("fields", "not_exist"));
|
||||
xform.configure(Map.of("fields", "not_exist"));
|
||||
|
||||
final Schema valueSchema = SchemaBuilder.struct()
|
||||
.field("a", Schema.INT32_SCHEMA)
|
||||
|
|
|
@ -35,18 +35,18 @@ public class FieldSyntaxVersionTest {
|
|||
@Test
|
||||
void shouldAppendConfigToDef() {
|
||||
ConfigDef def = FieldSyntaxVersion.appendConfigTo(new ConfigDef());
|
||||
assertEquals(def.configKeys().size(), 1);
|
||||
assertEquals(1, def.configKeys().size());
|
||||
final ConfigDef.ConfigKey configKey = def.configKeys().get("field.syntax.version");
|
||||
assertEquals(configKey.name, "field.syntax.version");
|
||||
assertEquals(configKey.defaultValue, "V1");
|
||||
assertEquals("field.syntax.version", configKey.name);
|
||||
assertEquals("V1", configKey.defaultValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldFailWhenAppendConfigToDefAgain() {
|
||||
ConfigDef def = FieldSyntaxVersion.appendConfigTo(new ConfigDef());
|
||||
assertEquals(def.configKeys().size(), 1);
|
||||
assertEquals(1, def.configKeys().size());
|
||||
ConfigException e = assertThrows(ConfigException.class, () -> FieldSyntaxVersion.appendConfigTo(def));
|
||||
assertEquals(e.getMessage(), "Configuration field.syntax.version is defined twice.");
|
||||
assertEquals("Configuration field.syntax.version is defined twice.", e.getMessage());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
|
|
@ -26,13 +26,11 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
|
|||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
@ -58,16 +56,16 @@ public class HasHeaderKeyTest {
|
|||
@Test
|
||||
public void testConfig() {
|
||||
HasHeaderKey<SourceRecord> predicate = new HasHeaderKey<>();
|
||||
predicate.config().validate(Collections.singletonMap("name", "foo"));
|
||||
predicate.config().validate(Map.of("name", "foo"));
|
||||
|
||||
List<ConfigValue> configs = predicate.config().validate(Collections.singletonMap("name", ""));
|
||||
assertEquals(singletonList("Invalid value for configuration name: String must be non-empty"), configs.get(0).errorMessages());
|
||||
List<ConfigValue> configs = predicate.config().validate(Map.of("name", ""));
|
||||
assertEquals(List.of("Invalid value for configuration name: String must be non-empty"), configs.get(0).errorMessages());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTest() {
|
||||
HasHeaderKey<SourceRecord> predicate = new HasHeaderKey<>();
|
||||
predicate.configure(Collections.singletonMap("name", "foo"));
|
||||
predicate.configure(Map.of("name", "foo"));
|
||||
|
||||
assertTrue(predicate.test(recordWithHeaders("foo")));
|
||||
assertTrue(predicate.test(recordWithHeaders("foo", "bar")));
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.kafka.connect.transforms.util.SimpleConfig;
|
|||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -61,9 +60,9 @@ public class TopicNameMatchesTest {
|
|||
@Test
|
||||
public void testConfig() {
|
||||
TopicNameMatches<SourceRecord> predicate = new TopicNameMatches<>();
|
||||
predicate.config().validate(Collections.singletonMap("pattern", "my-prefix-.*"));
|
||||
predicate.config().validate(Map.of("pattern", "my-prefix-.*"));
|
||||
|
||||
List<ConfigValue> configs = predicate.config().validate(Collections.singletonMap("pattern", "*"));
|
||||
List<ConfigValue> configs = predicate.config().validate(Map.of("pattern", "*"));
|
||||
List<String> errorMsgs = configs.get(0).errorMessages();
|
||||
assertEquals(1, errorMsgs.size());
|
||||
assertTrue(errorMsgs.get(0).contains("Invalid regex"));
|
||||
|
@ -72,7 +71,7 @@ public class TopicNameMatchesTest {
|
|||
@Test
|
||||
public void testTest() {
|
||||
TopicNameMatches<SourceRecord> predicate = new TopicNameMatches<>();
|
||||
predicate.configure(Collections.singletonMap("pattern", "my-prefix-.*"));
|
||||
predicate.configure(Map.of("pattern", "my-prefix-.*"));
|
||||
|
||||
assertTrue(predicate.test(recordWithTopicName("my-prefix-")));
|
||||
assertTrue(predicate.test(recordWithTopicName("my-prefix-foo")));
|
||||
|
|
|
@ -20,7 +20,7 @@ import org.apache.kafka.common.config.ConfigException;
|
|||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
|
@ -34,11 +34,11 @@ public class NonEmptyListValidatorTest {
|
|||
@Test
|
||||
public void testEmptyList() {
|
||||
assertThrows(ConfigException.class,
|
||||
() -> new NonEmptyListValidator().ensureValid("foo", Collections.emptyList()));
|
||||
() -> new NonEmptyListValidator().ensureValid("foo", List.of()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidList() {
|
||||
new NonEmptyListValidator().ensureValid("foo", Collections.singletonList("foo"));
|
||||
new NonEmptyListValidator().ensureValid("foo", List.of("foo"));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue