KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes (#14304)

Reviewers: Yash Mayya <yash.mayya@gmail.com>, Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Chris Egerton 2023-12-08 15:00:44 -05:00 committed by GitHub
parent 9de72daa50
commit dc857fb6bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 337 additions and 41 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.utils;
import java.lang.reflect.Modifier;
import java.nio.BufferUnderflowException;
import java.nio.ByteOrder;
import java.nio.file.StandardOpenOption;
@ -1108,6 +1109,15 @@ public final class Utils {
void close();
}
/**
* Closes {@code maybeCloseable} if it implements the {@link AutoCloseable} interface,
* and if an exception is thrown, it is logged at the WARN level.
*/
public static void maybeCloseQuietly(Object maybeCloseable, String name) {
if (maybeCloseable instanceof AutoCloseable)
closeQuietly((AutoCloseable) maybeCloseable, name);
}
/**
* Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
* <b>Be cautious when passing method references as an argument.</b> For example:
@ -1576,6 +1586,38 @@ public final class Utils {
.toArray(String[]::new);
}
/**
* Ensure that the class is concrete (i.e., not abstract), and that it subclasses a given base class.
* If it is abstract or does not subclass the given base class, throw a {@link ConfigException}
* with a friendly error message suggesting a list of concrete child subclasses (if any are known).
* @param baseClass the expected superclass; may not be null
* @param klass the class to check; may not be null
* @throws ConfigException if the class is not concrete
*/
public static void ensureConcreteSubclass(Class<?> baseClass, Class<?> klass) {
Objects.requireNonNull(baseClass);
Objects.requireNonNull(klass);
if (!baseClass.isAssignableFrom(klass)) {
String inheritFrom = baseClass.isInterface() ? "implement" : "extend";
String baseClassType = baseClass.isInterface() ? "interface" : "class";
throw new ConfigException("Class " + klass + " does not " + inheritFrom + " the " + baseClass.getSimpleName() + " " + baseClassType);
}
if (Modifier.isAbstract(klass.getModifiers())) {
String childClassNames = Stream.of(klass.getClasses())
.filter(baseClass::isAssignableFrom)
.filter(c -> !Modifier.isAbstract(c.getModifiers()))
.filter(c -> Modifier.isPublic(c.getModifiers()))
.map(Class::getName)
.collect(Collectors.joining(", "));
String message = "This class is abstract and cannot be created.";
if (!Utils.isBlank(childClassNames))
message += " Did you mean " + childClassNames + "?";
throw new ConfigException(message);
}
}
/**
* Convert time instant to readable string for logging
* @param timestamp the timestamp of the instant to be converted.

View File

@ -28,12 +28,15 @@ import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.ConcreteSubClassValidator;
import org.apache.kafka.connect.util.InstantiableClassValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -42,7 +45,6 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@ -82,10 +84,18 @@ public class ConnectorConfig extends AbstractConfig {
public static final String KEY_CONVERTER_CLASS_CONFIG = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
public static final String KEY_CONVERTER_CLASS_DOC = WorkerConfig.KEY_CONVERTER_CLASS_DOC;
public static final String KEY_CONVERTER_CLASS_DISPLAY = "Key converter class";
private static final ConfigDef.Validator KEY_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
ConcreteSubClassValidator.forSuperClass(Converter.class),
new InstantiableClassValidator()
);
public static final String VALUE_CONVERTER_CLASS_CONFIG = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC;
public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class";
private static final ConfigDef.Validator VALUE_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
ConcreteSubClassValidator.forSuperClass(Converter.class),
new InstantiableClassValidator()
);
public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
@ -93,6 +103,10 @@ public class ConnectorConfig extends AbstractConfig {
// The Connector config should not have a default for the header converter, since the absence of a config property means that
// the worker config settings should be used. Thus, we set the default to null here.
public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;
private static final ConfigDef.Validator HEADER_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
ConcreteSubClassValidator.forSuperClass(HeaderConverter.class),
new InstantiableClassValidator()
);
public static final String TASKS_MAX_CONFIG = "tasks.max";
private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
@ -181,9 +195,9 @@ public class ConnectorConfig extends AbstractConfig {
.define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
.define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY)
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
.define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, ++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
.define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART,
@ -491,21 +505,11 @@ public class ConnectorConfig extends AbstractConfig {
* @param cls The subclass of the baseclass.
*/
ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> cls) {
if (cls == null || !baseClass.isAssignableFrom(cls)) {
throw new ConfigException(key, String.valueOf(cls), "Not a " + baseClass.getSimpleName());
}
if (Modifier.isAbstract(cls.getModifiers())) {
String childClassNames = Stream.of(cls.getClasses())
.filter(cls::isAssignableFrom)
.filter(c -> !Modifier.isAbstract(c.getModifiers()))
.filter(c -> Modifier.isPublic(c.getModifiers()))
.map(Class::getName)
.collect(Collectors.joining(", "));
String message = Utils.isBlank(childClassNames) ?
aliasKind + " is abstract and cannot be created." :
aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?";
throw new ConfigException(key, String.valueOf(cls), message);
if (cls == null) {
throw new ConfigException(key, null, "Not a " + baseClass.getSimpleName());
}
Utils.ensureConcreteSubclass(baseClass, cls);
T transformation;
try {
transformation = Utils.newInstance(cls, baseClass);

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;
public class ConcreteSubClassValidator implements ConfigDef.Validator {
private final Class<?> expectedSuperClass;
private ConcreteSubClassValidator(Class<?> expectedSuperClass) {
this.expectedSuperClass = expectedSuperClass;
}
public static ConcreteSubClassValidator forSuperClass(Class<?> expectedSuperClass) {
return new ConcreteSubClassValidator(expectedSuperClass);
}
@Override
public void ensureValid(String name, Object value) {
if (value == null) {
// The value will be null if the class couldn't be found; no point in performing follow-up validation
return;
}
Class<?> cls = (Class<?>) value;
Utils.ensureConcreteSubclass(expectedSuperClass, cls);
}
@Override
public String toString() {
return "A concrete subclass of " + expectedSuperClass.getName();
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.util;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
public class InstantiableClassValidator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
if (value == null) {
// The value will be null if the class couldn't be found; no point in performing follow-up validation
return;
}
Class<?> cls = (Class<?>) value;
try {
Object o = cls.getDeclaredConstructor().newInstance();
Utils.maybeCloseQuietly(o, o + " (instantiated for preflight validation");
} catch (NoSuchMethodException | IllegalAccessException e) {
throw new ConfigException(name, cls.getName(), "Could not find a public no-argument constructor for class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
} catch (ReflectiveOperationException | LinkageError | RuntimeException e) {
throw new ConfigException(name, cls.getName(), "Could not instantiate class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
}
}
@Override
public String toString() {
return "A class with a public, no-argument constructor";
}
}

View File

@ -16,7 +16,13 @@
*/
package org.apache.kafka.connect.integration;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Filter;
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
@ -27,6 +33,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -296,6 +303,55 @@ public class ConnectorValidationIntegrationTest {
);
}
@Test
public void testConnectorHasInvalidConverterClassType() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(KEY_CONVERTER_CLASS_CONFIG, MonitorableSinkConnector.class.getName());
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a converter with a class of the wrong type is specified",
0
);
}
@Test
public void testConnectorHasAbstractConverter() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(KEY_CONVERTER_CLASS_CONFIG, AbstractTestConverter.class.getName());
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when an abstract converter class is specified"
);
}
@Test
public void testConnectorHasConverterWithNoSuitableConstructor() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName());
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a converter class with no suitable constructor is specified"
);
}
@Test
public void testConnectorHasConverterThatThrowsExceptionOnInstantiation() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName());
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a converter class that throws an exception on instantiation is specified"
);
}
@Test
public void testConnectorHasMissingHeaderConverterClass() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
@ -309,6 +365,111 @@ public class ConnectorValidationIntegrationTest {
);
}
@Test
public void testConnectorHasInvalidHeaderConverterClassType() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(HEADER_CONVERTER_CLASS_CONFIG, MonitorableSinkConnector.class.getName());
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a header converter with a class of the wrong type is specified"
);
}
@Test
public void testConnectorHasAbstractHeaderConverter() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(HEADER_CONVERTER_CLASS_CONFIG, AbstractTestConverter.class.getName());
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when an abstract header converter class is specified"
);
}
@Test
public void testConnectorHasHeaderConverterWithNoSuitableConstructor() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName());
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a header converter class with no suitable constructor is specified"
);
}
@Test
public void testConnectorHasHeaderConverterThatThrowsExceptionOnInstantiation() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName());
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a header converter class that throws an exception on instantiation is specified"
);
}
public static abstract class TestConverter implements Converter, HeaderConverter {
// Defined by both Converter and HeaderConverter interfaces
@Override
public ConfigDef config() {
return null;
}
// Defined by Converter interface
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] fromConnectData(String topic, Schema schema, Object value) {
return null;
}
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
return null;
}
// Defined by HeaderConverter interface
@Override
public void close() throws IOException {
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
return null;
}
@Override
public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
return new byte[0];
}
}
public static abstract class AbstractTestConverter extends TestConverter {
}
public static class TestConverterWithPrivateConstructor extends TestConverter {
private TestConverterWithPrivateConstructor() {
}
}
public static class TestConverterWithConstructorThatThrowsException extends TestConverter {
public TestConverterWithConstructorThatThrowsException() {
throw new ConnectException("whoops");
}
}
private Map<String, String> defaultSourceConnectorProps() {
// setup up props for the source connector
Map<String, String> props = new HashMap<>();

View File

@ -198,13 +198,10 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", AbstractTransformation.class.getName());
try {
new ConnectorConfig(MOCK_PLUGINS, props);
} catch (ConfigException ex) {
assertTrue(
ex.getMessage().contains("Transformation is abstract and cannot be created.")
);
}
ConfigException ex = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(
ex.getMessage().contains("This class is abstract and cannot be created.")
);
}
@Test
public void abstractKeyValueTransform() {
@ -213,19 +210,16 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", AbstractKeyValueTransformation.class.getName());
try {
new ConnectorConfig(MOCK_PLUGINS, props);
} catch (ConfigException ex) {
assertTrue(
ex.getMessage().contains("Transformation is abstract and cannot be created.")
);
assertTrue(
ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName())
);
assertTrue(
ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName())
);
}
ConfigException ex = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(
ex.getMessage().contains("This class is abstract and cannot be created.")
);
assertTrue(
ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName())
);
assertTrue(
ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName())
);
}
@Test
@ -240,7 +234,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestConnector.class.getName());
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("Not a Predicate"));
assertEquals("Class " + TestConnector.class + " does not implement the Predicate interface", e.getMessage());
}
@Test
@ -287,7 +281,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("predicates.my-pred.type", AbstractTestPredicate.class.getName());
props.put("predicates.my-pred.int", "84");
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created"));
assertTrue(e.getMessage().contains("This class is abstract and cannot be created"));
}
private void assertTransformationStageWithPredicate(Map<String, String> props, boolean expectedNegated) {