KAFKA-14226: Introduce FieldPath abstraction and nested path support for ExtractField SMT (#15379)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Jorge Esteban Quilcate Otoya 2024-05-07 21:07:18 +03:00 committed by GitHub
parent 05df10449e
commit a4c6cefd10
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 770 additions and 9 deletions

View File

@ -658,6 +658,7 @@
<allow class="org.apache.kafka.connect.source.SourceRecord" /> <allow class="org.apache.kafka.connect.source.SourceRecord" />
<allow class="org.apache.kafka.connect.sink.SinkRecord" /> <allow class="org.apache.kafka.connect.sink.SinkRecord" />
<allow pkg="org.apache.kafka.connect.transforms.util" /> <allow pkg="org.apache.kafka.connect.transforms.util" />
<allow pkg="org.apache.kafka.connect.transforms.field" />
</subpackage> </subpackage>
</subpackage> </subpackage>

View File

@ -23,6 +23,8 @@ import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.field.SingleFieldPath;
import org.apache.kafka.connect.transforms.field.FieldSyntaxVersion;
import org.apache.kafka.connect.transforms.util.SimpleConfig; import org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.util.Map; import java.util.Map;
@ -40,12 +42,20 @@ public abstract class ExtractField<R extends ConnectRecord<R>> implements Transf
private static final String FIELD_CONFIG = "field"; private static final String FIELD_CONFIG = "field";
public static final ConfigDef CONFIG_DEF = new ConfigDef() public static final ConfigDef CONFIG_DEF = FieldSyntaxVersion.appendConfigTo(
.define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract."); new ConfigDef()
.define(
FIELD_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.MEDIUM,
"Field name to extract."
));
private static final String PURPOSE = "field extraction"; private static final String PURPOSE = "field extraction";
private String fieldName; private SingleFieldPath fieldPath;
private String originalPath;
@Override @Override
public String version() { public String version() {
@ -55,7 +65,8 @@ public abstract class ExtractField<R extends ConnectRecord<R>> implements Transf
@Override @Override
public void configure(Map<String, ?> props) { public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
fieldName = config.getString(FIELD_CONFIG); originalPath = config.getString(FIELD_CONFIG);
fieldPath = new SingleFieldPath(originalPath, FieldSyntaxVersion.fromConfig(config));
} }
@Override @Override
@ -63,16 +74,16 @@ public abstract class ExtractField<R extends ConnectRecord<R>> implements Transf
final Schema schema = operatingSchema(record); final Schema schema = operatingSchema(record);
if (schema == null) { if (schema == null) {
final Map<String, Object> value = requireMapOrNull(operatingValue(record), PURPOSE); final Map<String, Object> value = requireMapOrNull(operatingValue(record), PURPOSE);
return newRecord(record, null, value == null ? null : value.get(fieldName)); return newRecord(record, null, value == null ? null : fieldPath.valueFrom(value));
} else { } else {
final Struct value = requireStructOrNull(operatingValue(record), PURPOSE); final Struct value = requireStructOrNull(operatingValue(record), PURPOSE);
Field field = schema.field(fieldName); Field field = fieldPath.fieldFrom(schema);
if (field == null) { if (field == null) {
throw new IllegalArgumentException("Unknown field: " + fieldName); throw new IllegalArgumentException("Unknown field: " + originalPath);
} }
return newRecord(record, field.schema(), value == null ? null : value.get(fieldName)); return newRecord(record, field.schema(), value == null ? null : fieldPath.valueFrom(value));
} }
} }

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.field;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import java.util.Locale;
/**
* Defines semantics of field paths by versioning.
*
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
* @see SingleFieldPath
*/
public enum FieldSyntaxVersion {
/**
* No support to access nested fields, only attributes at the root of data structure.
* Backward compatible (i.e. before KIP-821).
*/
V1,
/**
* Support to access nested fields using dotted notation
* (with backtick pairs to wrap field names that include dots).
*/
V2;
public static final String FIELD_SYNTAX_VERSION_CONFIG = "field.syntax.version";
public static final String FIELD_SYNTAX_VERSION_DOC =
"Defines the version of the syntax to access fields. "
+ "If set to `V1`, then the field paths are limited to access the elements at the root level of the struct or map. "
+ "If set to `V2`, the syntax will support accessing nested elements. "
+ "To access nested elements, dotted notation is used. "
+ "If dots are already included in the field name, "
+ "then backtick pairs can be used to wrap field names containing dots. "
+ "E.g. to access the subfield `baz` from a field named \"foo.bar\" in a struct/map "
+ "the following format can be used to access its elements: \"`foo.bar`.baz\".";
public static final String FIELD_SYNTAX_VERSION_DEFAULT_VALUE = V1.name();
/**
* Extend existing config definition by adding field syntax version.
* To be used by transforms supporting nested fields.
*
* @param configDef exiting config definition
* @return config definition including field syntax version definition
*/
public static ConfigDef appendConfigTo(ConfigDef configDef) {
return configDef
.define(
FieldSyntaxVersion.FIELD_SYNTAX_VERSION_CONFIG,
ConfigDef.Type.STRING,
FieldSyntaxVersion.FIELD_SYNTAX_VERSION_DEFAULT_VALUE,
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(FieldSyntaxVersion.class)),
ConfigDef.Importance.HIGH,
FieldSyntaxVersion.FIELD_SYNTAX_VERSION_DOC);
}
/**
* Gather version from config values.
*
* @param config including value for field syntax version configuration
* @return field syntax version
* @throws ConfigException if fails to collect version, e.g. wrong value
*/
public static FieldSyntaxVersion fromConfig(AbstractConfig config) {
final String fieldSyntaxVersion = config.getString(FIELD_SYNTAX_VERSION_CONFIG);
try {
return FieldSyntaxVersion.valueOf(fieldSyntaxVersion.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new ConfigException(FIELD_SYNTAX_VERSION_CONFIG, fieldSyntaxVersion, "Unrecognized field syntax version");
}
}
}

View File

@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.field;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.apache.kafka.connect.transforms.util.Requirements.requireMapOrNull;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStructOrNull;
/**
* A SingleFieldPath is composed of one or more field names, known as path steps,
* to access values within a data object (either {@code Struct} or {@code Map<String, Object>}).
*
* <p>The field path semantics are defined by the {@link FieldSyntaxVersion syntax version}.
*
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-821%3A+Connect+Transforms+support+for+nested+structures">KIP-821</a>
* @see FieldSyntaxVersion
*/
public class SingleFieldPath {
// Invariants:
// - A field path can contain one or more steps
private static final char BACKTICK = '`';
private static final char DOT = '.';
private static final char BACKSLASH = '\\';
private final FieldSyntaxVersion version;
private final List<String> steps;
public SingleFieldPath(String pathText, FieldSyntaxVersion version) {
this.version = version;
switch (version) {
case V1: // backward compatibility
this.steps = Collections.singletonList(pathText);
break;
case V2:
this.steps = buildFieldPathV2(pathText);
break;
default:
throw new IllegalArgumentException("Unknown syntax version: " + version);
}
}
private static List<String> buildFieldPathV2(String path) {
final List<String> steps = new ArrayList<>();
// path character index to track backticks and dots and break path into steps
int idx = 0;
while (idx < path.length() && idx >= 0) {
if (path.charAt(idx) != BACKTICK) {
final int start = idx;
idx = path.indexOf(String.valueOf(DOT), idx);
if (idx >= 0) { // get path step and move forward
String field = path.substring(start, idx);
steps.add(field);
idx++;
} else { // add all
String field = path.substring(start);
steps.add(field);
}
} else { // has backtick
int backtickAt = idx;
idx++;
StringBuilder field = new StringBuilder();
int start = idx;
while (true) {
// find closing backtick
idx = path.indexOf(String.valueOf(BACKTICK), idx);
if (idx == -1) { // if not found, then fail
failWhenIncompleteBacktickPair(path, backtickAt);
}
// backtick escaped if right after backslash
boolean escaped = path.charAt(idx - 1) == BACKSLASH;
if (idx >= path.length() - 1) { // at the end of path
if (escaped) { // but escaped, then fail
failWhenIncompleteBacktickPair(path, backtickAt);
}
field.append(path, start, idx);
// we've reached the end of the path, and the last character is the backtick
steps.add(field.toString());
idx++;
break;
}
if (path.charAt(idx + 1) != DOT) { // not followed by a dot
// this backtick isn't followed by a dot; include it in the field name, but continue
// looking for a matching backtick that is followed by a dot
idx++;
continue;
}
if (escaped) {
// this backtick was escaped; include it in the field name, but continue
// looking for an unescaped matching backtick
field.append(path, start, idx - 1)
.append(BACKTICK);
idx++;
start = idx;
continue;
}
// we've found our matching backtick
field.append(path, start, idx);
steps.add(field.toString());
idx += 2; // increment by two to include the backtick and the dot after it
break;
}
}
}
// add last step if last char is a dot
if (!path.isEmpty() && path.charAt(path.length() - 1) == DOT)
steps.add("");
return Collections.unmodifiableList(steps);
}
private static void failWhenIncompleteBacktickPair(String path, int backtickAt) {
throw new ConfigException("Incomplete backtick pair in path: [" + path + "],"
+ " consider adding a backslash before backtick at position " + backtickAt
+ " to escape it");
}
/**
* Access a {@code Field} at the current path within a schema {@code Schema}
* If field is not found, then {@code null} is returned.
*/
public Field fieldFrom(Schema schema) {
if (schema == null) return null;
Schema current = schema;
for (String pathSegment : stepsWithoutLast()) {
final Field field = current.field(pathSegment);
if (field != null) {
current = field.schema();
} else {
return null;
}
}
return current.field(lastStep());
}
/**
* Access a value at the current path within a schema-based {@code Struct}
* If object is not found, then {@code null} is returned.
*/
public Object valueFrom(Struct struct) {
if (struct == null) return null;
Struct current = struct;
for (String pathSegment : stepsWithoutLast()) {
// Check to see if the field actually exists
if (current.schema().field(pathSegment) == null) {
return null;
}
Object subValue = current.get(pathSegment);
current = requireStructOrNull(subValue, "nested field access");
if (current == null) return null;
}
if (current.schema().field(lastStep()) != null) {
return current.get(lastStep());
} else {
return null;
}
}
/**
* Access a value at the current path within a schemaless {@code Map<String, Object>}.
* If object is not found, then {@code null} is returned.
*/
public Object valueFrom(Map<String, Object> map) {
if (map == null) return null;
Map<String, Object> current = map;
for (String step : stepsWithoutLast()) {
current = requireMapOrNull(current.get(step), "nested field access");
if (current == null) return null;
}
return current.get(lastStep());
}
// For testing
String[] path() {
return steps.toArray(new String[0]);
}
private String lastStep() {
return steps.get(lastStepIndex());
}
private int lastStepIndex() {
return steps.size() - 1;
}
private List<String> stepsWithoutLast() {
return steps.subList(0, lastStepIndex());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SingleFieldPath that = (SingleFieldPath) o;
return Objects.equals(steps, that.steps);
}
@Override
public int hashCode() {
return Objects.hash(steps);
}
@Override
public String toString() {
return "SingleFieldPath{" +
"version=" + version +
", path=" + String.join(".", steps) +
'}';
}
}

View File

@ -16,11 +16,13 @@
*/ */
package org.apache.kafka.connect.transforms; package org.apache.kafka.connect.transforms;
import java.util.HashMap;
import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.field.FieldSyntaxVersion;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -51,7 +53,22 @@ public class ExtractFieldTest {
} }
@Test @Test
public void testNullSchemaless() { public void schemalessAndNestedPath() {
Map<String, String> configs = new HashMap<>();
configs.put(FieldSyntaxVersion.FIELD_SYNTAX_VERSION_CONFIG, FieldSyntaxVersion.V2.name());
configs.put("field", "magic.foo");
xform.configure(configs);
final Map<String, Object> key = Collections.singletonMap("magic", Collections.singletonMap("foo", 42));
final SinkRecord record = new SinkRecord("test", 0, null, key, null, null, 0);
final SinkRecord transformedRecord = xform.apply(record);
assertNull(transformedRecord.keySchema());
assertEquals(42, transformedRecord.key());
}
@Test
public void nullSchemaless() {
xform.configure(Collections.singletonMap("field", "magic")); xform.configure(Collections.singletonMap("field", "magic"));
final Map<String, Object> key = null; final Map<String, Object> key = null;
@ -75,6 +92,23 @@ public class ExtractFieldTest {
assertEquals(42, transformedRecord.key()); assertEquals(42, transformedRecord.key());
} }
@Test
public void withSchemaAndNestedPath() {
Map<String, String> configs = new HashMap<>();
configs.put(FieldSyntaxVersion.FIELD_SYNTAX_VERSION_CONFIG, FieldSyntaxVersion.V2.name());
configs.put("field", "magic.foo");
xform.configure(configs);
final Schema fooSchema = SchemaBuilder.struct().field("foo", Schema.INT32_SCHEMA).build();
final Schema keySchema = SchemaBuilder.struct().field("magic", fooSchema).build();
final Struct key = new Struct(keySchema).put("magic", new Struct(fooSchema).put("foo", 42));
final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0);
final SinkRecord transformedRecord = xform.apply(record);
assertEquals(Schema.INT32_SCHEMA, transformedRecord.keySchema());
assertEquals(42, transformedRecord.key());
}
@Test @Test
public void testNullWithSchema() { public void testNullWithSchema() {
xform.configure(Collections.singletonMap("field", "magic")); xform.configure(Collections.singletonMap("field", "magic"));
@ -99,6 +133,21 @@ public class ExtractFieldTest {
assertNull(transformedRecord.key()); assertNull(transformedRecord.key());
} }
@Test
public void nonExistentNestedFieldSchemalessShouldReturnNull() {
Map<String, String> configs = new HashMap<>();
configs.put(FieldSyntaxVersion.FIELD_SYNTAX_VERSION_CONFIG, FieldSyntaxVersion.V2.name());
configs.put("field", "magic.nonexistent");
xform.configure(configs);
final Map<String, Object> key = Collections.singletonMap("magic", Collections.singletonMap("foo", 42));
final SinkRecord record = new SinkRecord("test", 0, null, key, null, null, 0);
final SinkRecord transformedRecord = xform.apply(record);
assertNull(transformedRecord.keySchema());
assertNull(transformedRecord.key());
}
@Test @Test
public void nonExistentFieldWithSchemaShouldFail() { public void nonExistentFieldWithSchemaShouldFail() {
xform.configure(Collections.singletonMap("field", "nonexistent")); xform.configure(Collections.singletonMap("field", "nonexistent"));
@ -115,6 +164,26 @@ public class ExtractFieldTest {
} }
} }
@Test
public void nonExistentNestedFieldWithSchemaShouldFail() {
Map<String, String> configs = new HashMap<>();
configs.put(FieldSyntaxVersion.FIELD_SYNTAX_VERSION_CONFIG, FieldSyntaxVersion.V2.name());
configs.put("field", "magic.nonexistent");
xform.configure(configs);
final Schema fooSchema = SchemaBuilder.struct().field("foo", Schema.INT32_SCHEMA).build();
final Schema keySchema = SchemaBuilder.struct().field("magic", fooSchema).build();
final Struct key = new Struct(keySchema).put("magic", new Struct(fooSchema).put("foo", 42));
final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0);
try {
xform.apply(record);
fail("Expected exception wasn't raised");
} catch (IllegalArgumentException iae) {
assertEquals("Unknown field: magic.nonexistent", iae.getMessage());
}
}
@Test @Test
public void testExtractFieldVersionRetrievedFromAppInfoParser() { public void testExtractFieldVersionRetrievedFromAppInfoParser() {
assertEquals(AppInfoParser.getVersion(), xform.version()); assertEquals(AppInfoParser.getVersion(), xform.version());

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.field;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
class FieldPathNotationTest {
final static String[] EMPTY_PATH = new String[] {};
@Test
void shouldBuildV1WithDotsAndBacktickPair() {
// Given v1
// When path contains dots, then single step path
assertParseV1("foo.bar.baz");
// When path contains backticks, then single step path
assertParseV1("foo`bar`");
// When path contains dots and backticks, then single step path
assertParseV1("foo.`bar.baz`");
}
@Test
void shouldIncludeEmptyFieldNames() {
assertParseV2("..", "", "", "");
assertParseV2("foo..", "foo", "", "");
assertParseV2(".bar.", "", "bar", "");
assertParseV2("..baz", "", "", "baz");
}
@Test
void shouldBuildV2WithEmptyPath() {
// Given v2
// When path is empty
// Then build a path with no steps
assertParseV2("", EMPTY_PATH);
}
@Test
void shouldBuildV2WithoutDots() {
// Given v2
// When path without dots
// Then build a single step path
assertParseV2("foobarbaz", "foobarbaz");
}
@Test
void shouldBuildV2WhenIncludesDots() {
// Given v2 and fields without dots
// When path includes dots
// Then build a path with steps separated by dots
assertParseV2("foo.bar.baz", "foo", "bar", "baz");
}
@Test
void shouldBuildV2WithoutWrappingBackticks() {
// Given v2 and fields without dots
// When backticks are not wrapping a field name
// Then build a single step path including backticks
assertParseV2("foo`bar`baz", "foo`bar`baz");
}
@Test
void shouldBuildV2WhenIncludesDotsAndBacktickPair() {
// Given v2 and fields including dots
// When backticks are wrapping a field name (i.e. withing edges or between dots)
// Then build a path with steps separated by dots and not including backticks
assertParseV2("`foo.bar.baz`", "foo.bar.baz");
assertParseV2("foo.`bar.baz`", "foo", "bar.baz");
assertParseV2("`foo.bar`.baz", "foo.bar", "baz");
assertParseV2("foo.`bar`.baz", "foo", "bar", "baz");
}
@Test
void shouldBuildV2AndIgnoreBackticksThatAreNotWrapping() {
// Given v2 and fields including dots and backticks
// When backticks are wrapping a field name (i.e. withing edges or between dots)
// Then build a path with steps separated by dots and including non-wrapping backticks
assertParseV2("foo.``bar.baz`", "foo", "`bar.baz");
assertParseV2("foo.`bar.baz``", "foo", "bar.baz`");
assertParseV2("foo.`ba`r.baz`", "foo", "ba`r.baz");
assertParseV2("foo.ba`r.baz", "foo", "ba`r", "baz");
assertParseV2("foo.``bar``.baz", "foo", "`bar`", "baz");
assertParseV2("``foo.bar.baz``", "`foo.bar.baz`");
}
@Test
void shouldBuildV2AndEscapeBackticks() {
// Given v2 and fields including dots and backticks
// When backticks are wrapping a field name (i.e. withing edges or between dots)
// and wrapping backticks that are part of the field name are escaped with backslashes
// Then build a path with steps separated by dots and including escaped and non-wrapping backticks
assertParseV2("foo.`bar\\`.baz`", "foo", "bar`.baz");
assertParseV2("foo.`bar.`baz`", "foo", "bar.`baz");
assertParseV2("foo.`bar\\`.`baz`", "foo", "bar`.`baz");
assertParseV2("foo.`bar\\\\`.\\`baz`", "foo", "bar\\`.\\`baz");
}
@Test
void shouldFailV2WhenIncompleteBackticks() {
// Given v2
// When backticks are not closed and not escaped
// Then it should fail
assertParseV2Error(
"`foo.bar.baz",
"Incomplete backtick pair in path: [`foo.bar.baz], consider adding a backslash before backtick at position 0 to escape it"
);
assertParseV2Error(
"foo.`bar.baz",
"Incomplete backtick pair in path: [foo.`bar.baz], consider adding a backslash before backtick at position 4 to escape it"
);
assertParseV2Error(
"foo.bar.`baz",
"Incomplete backtick pair in path: [foo.bar.`baz], consider adding a backslash before backtick at position 8 to escape it"
);
assertParseV2Error(
"foo.bar.`baz\\`",
"Incomplete backtick pair in path: [foo.bar.`baz\\`], consider adding a backslash before backtick at position 8 to escape it"
);
}
private void assertParseV1(String path) {
assertArrayEquals(
new String[] {path},
new SingleFieldPath(path, FieldSyntaxVersion.V1).path());
}
private void assertParseV2(String inputPath, String... expectedSteps) {
assertArrayEquals(
expectedSteps,
new SingleFieldPath(inputPath, FieldSyntaxVersion.V2).path()
);
}
private void assertParseV2Error(String inputPath, String expectedMessage) {
ConfigException exception = assertThrows(
ConfigException.class,
() -> new SingleFieldPath(inputPath, FieldSyntaxVersion.V2)
);
assertEquals(expectedMessage, exception.getMessage());
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.field;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class FieldSyntaxVersionTest {
@Test
void shouldAppendConfigToDef() {
ConfigDef def = FieldSyntaxVersion.appendConfigTo(new ConfigDef());
assertEquals(def.configKeys().size(), 1);
final ConfigDef.ConfigKey configKey = def.configKeys().get("field.syntax.version");
assertEquals(configKey.name, "field.syntax.version");
assertEquals(configKey.defaultValue, "V1");
}
@Test
void shouldFailWhenAppendConfigToDefAgain() {
ConfigDef def = FieldSyntaxVersion.appendConfigTo(new ConfigDef());
assertEquals(def.configKeys().size(), 1);
ConfigException e = assertThrows(ConfigException.class, () -> FieldSyntaxVersion.appendConfigTo(def));
assertEquals(e.getMessage(), "Configuration field.syntax.version is defined twice.");
}
@ParameterizedTest
@CsvSource({"v1,V1", "v2,V2", "V1,V1", "V2,V2"})
void shouldGetVersionFromConfig(String input, FieldSyntaxVersion version) {
Map<String, String> configs = new HashMap<>();
configs.put("field.syntax.version", input);
AbstractConfig config = new AbstractConfig(FieldSyntaxVersion.appendConfigTo(new ConfigDef()), configs);
assertEquals(version, FieldSyntaxVersion.fromConfig(config));
}
@ParameterizedTest
@ValueSource(strings = {"v3", "V 1", "v", "V 2", "2", "1"})
void shouldFailWhenWrongVersionIsPassed(String input) {
Map<String, String> configs = new HashMap<>();
configs.put("field.syntax.version", input);
ConfigException e = assertThrows(ConfigException.class, () -> new AbstractConfig(FieldSyntaxVersion.appendConfigTo(new ConfigDef()), configs));
assertEquals(
"Invalid value " + input + " for configuration field.syntax.version: " +
"String must be one of (case insensitive): V1, V2",
e.getMessage());
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.field;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
class SingleFieldPathTest {
@Test void shouldFindField() {
Schema barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA).build();
Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
assertEquals(barSchema.field("bar"), pathV2("foo.bar").fieldFrom(schema));
assertEquals(schema.field("foo"), pathV2("foo").fieldFrom(schema));
}
@Test void shouldReturnNullFieldWhenFieldNotFound() {
Schema barSchema = SchemaBuilder.struct().field("bar", Schema.INT32_SCHEMA).build();
Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
assertNull(pathV2("un.known").fieldFrom(schema));
assertNull(pathV2("foo.unknown").fieldFrom(schema));
assertNull(pathV2("unknown").fieldFrom(schema));
assertNull(pathV2("test").fieldFrom(null));
}
@Test void shouldFindValueInMap() {
Map<String, Object> foo = new HashMap<>();
foo.put("bar", 42);
foo.put("baz", null);
Map<String, Object> map = new HashMap<>();
map.put("foo", foo);
assertEquals(42, pathV2("foo.bar").valueFrom(map));
assertNull(pathV2("foo.baz").valueFrom(map));
}
@Test void shouldReturnNullValueWhenFieldNotFoundInMap() {
Map<String, Object> foo = new HashMap<>();
foo.put("bar", 42);
foo.put("baz", null);
Map<String, Object> map = new HashMap<>();
map.put("foo", foo);
assertNull(pathV2("un.known").valueFrom(map));
assertNull(pathV2("foo.unknown").valueFrom(map));
assertNull(pathV2("unknown").valueFrom(map));
assertNull(pathV2("foo.baz.inner").valueFrom(map));
}
@Test void shouldFindValueInStruct() {
Schema bazSchema = SchemaBuilder.struct()
.field("inner", Schema.STRING_SCHEMA)
.optional()
.build();
Schema barSchema = SchemaBuilder.struct()
.field("bar", Schema.INT32_SCHEMA)
.field("baz", bazSchema)
.build();
Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
Struct foo = new Struct(barSchema)
.put("bar", 42)
.put("baz", null);
Struct struct = new Struct(schema).put("foo", foo);
assertEquals(42, pathV2("foo.bar").valueFrom(struct));
assertNull(pathV2("foo.baz").valueFrom(struct));
}
@Test void shouldReturnNullValueWhenFieldNotFoundInStruct() {
Schema bazSchema = SchemaBuilder.struct()
.field("inner", Schema.STRING_SCHEMA)
.optional()
.build();
Schema barSchema = SchemaBuilder.struct()
.field("bar", Schema.INT32_SCHEMA)
.field("baz", bazSchema)
.build();
Schema schema = SchemaBuilder.struct().field("foo", barSchema).build();
Struct foo = new Struct(barSchema)
.put("bar", 42)
.put("baz", null);
Struct struct = new Struct(schema).put("foo", foo);
assertNull(pathV2("un.known").valueFrom(struct));
assertNull(pathV2("foo.unknown").valueFrom(struct));
assertNull(pathV2("unknown").valueFrom(struct));
assertNull(pathV2("foo.baz.inner").valueFrom(struct));
}
private static SingleFieldPath pathV2(String path) {
return new SingleFieldPath(path, FieldSyntaxVersion.V2);
}
}