KAFKA-13328, KAFKA-13329 (2): Add custom preflight validation support for connector header, key, and value converters (#14309)

Reviewers:  Greg Harris <greg.harris@aiven.io>
This commit is contained in:
Chris Egerton 2024-05-07 17:30:57 +02:00 committed by GitHub
parent 21bf715622
commit 05df10449e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 311 additions and 37 deletions

View File

@ -52,6 +52,8 @@ import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
@ -79,6 +81,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -90,6 +93,10 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.kafka.connect.runtime.ConnectorConfig.HEADER_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
/**
* Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
* must invoke the lifecycle hooks appropriately.
@ -392,6 +399,161 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
return configDef.validateAll(config);
}
/**
* General-purpose validation logic for converters that are configured directly
* in a connector config (as opposed to inherited from the worker config).
* @param connectorConfig the configuration for the connector; may not be null
* @param pluginConfigValue the {@link ConfigValue} for the converter property in the connector config;
* may be null, in which case no validation will be performed under the assumption that the
* connector will use inherit the converter settings from the worker. Some errors encountered
* during validation may be {@link ConfigValue#addErrorMessage(String) added} to this object
* @param pluginInterface the interface for the plugin type
* (e.g., {@code org.apache.kafka.connect.storage.Converter.class});
* may not be null
* @param configDefAccessor an accessor that can be used to retrieve a {@link ConfigDef}
* from an instance of the plugin type (e.g., {@code Converter::config});
* may not be null
* @param pluginName a lowercase, human-readable name for the type of plugin (e.g., {@code "key converter"});
* may not be null
* @param pluginProperty the property used to define a custom class for the plugin type
* in a connector config (e.g., {@link ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
* may not be null
* @param defaultProperties any default properties to include in the configuration that will be used for
* the plugin; may be null
* @return a {@link ConfigInfos} object containing validation results for the plugin in the connector config,
* or null if either no custom validation was performed (possibly because no custom plugin was defined in the
* connector config), or if custom validation failed
* @param <T> the plugin class to perform validation for
*/
private <T> ConfigInfos validateConverterConfig(
Map<String, String> connectorConfig,
ConfigValue pluginConfigValue,
Class<T> pluginInterface,
Function<T, ConfigDef> configDefAccessor,
String pluginName,
String pluginProperty,
Map<String, String> defaultProperties,
Function<String, TemporaryStage> reportStage
) {
Objects.requireNonNull(connectorConfig);
Objects.requireNonNull(pluginInterface);
Objects.requireNonNull(configDefAccessor);
Objects.requireNonNull(pluginName);
Objects.requireNonNull(pluginProperty);
String pluginClass = connectorConfig.get(pluginProperty);
if (pluginClass == null
|| pluginConfigValue == null
|| !pluginConfigValue.errorMessages().isEmpty()
) {
// Either no custom converter was specified, or one was specified but there's a problem with it.
// No need to proceed any further.
return null;
}
T pluginInstance;
String stageDescription = "instantiating the connector's " + pluginName + " for validation";
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
} catch (ClassNotFoundException | RuntimeException e) {
log.error("Failed to instantiate {} class {}; this should have been caught by prior validation logic", pluginName, pluginClass, e);
pluginConfigValue.addErrorMessage("Failed to load class " + pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
return null;
}
try {
ConfigDef configDef;
stageDescription = "retrieving the configuration definition from the connector's " + pluginName;
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
configDef = configDefAccessor.apply(pluginInstance);
} catch (RuntimeException e) {
log.error("Failed to load ConfigDef from {} of type {}", pluginName, pluginClass, e);
pluginConfigValue.addErrorMessage("Failed to load ConfigDef from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
return null;
}
if (configDef == null) {
log.warn("{}.config() has returned a null ConfigDef; no further preflight config validation for this converter will be performed", pluginClass);
// Older versions of Connect didn't do any converter validation.
// Even though converters are technically required to return a non-null ConfigDef object from their config() method,
// we permit this case in order to avoid breaking existing converters that, despite not adhering to this requirement,
// can be used successfully with a connector.
return null;
}
final String pluginPrefix = pluginProperty + ".";
Map<String, String> pluginConfig = Utils.entriesWithPrefix(connectorConfig, pluginPrefix);
if (defaultProperties != null)
defaultProperties.forEach(pluginConfig::putIfAbsent);
List<ConfigValue> configValues;
stageDescription = "performing config validation for the connector's " + pluginName;
try (TemporaryStage stage = reportStage.apply(stageDescription)) {
configValues = configDef.validate(pluginConfig);
} catch (RuntimeException e) {
log.error("Failed to perform config validation for {} of type {}", pluginName, pluginClass, e);
pluginConfigValue.addErrorMessage("Failed to perform config validation for " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
return null;
}
return prefixedConfigInfos(configDef.configKeys(), configValues, pluginPrefix);
} finally {
Utils.maybeCloseQuietly(pluginInstance, pluginName + " " + pluginClass);
}
}
private ConfigInfos validateHeaderConverterConfig(
Map<String, String> connectorConfig,
ConfigValue headerConverterConfigValue,
Function<String, TemporaryStage> reportStage
) {
return validateConverterConfig(
connectorConfig,
headerConverterConfigValue,
HeaderConverter.class,
HeaderConverter::config,
"header converter",
HEADER_CONVERTER_CLASS_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName()),
reportStage
);
}
private ConfigInfos validateKeyConverterConfig(
Map<String, String> connectorConfig,
ConfigValue keyConverterConfigValue,
Function<String, TemporaryStage> reportStage
) {
return validateConverterConfig(
connectorConfig,
keyConverterConfigValue,
Converter.class,
Converter::config,
"key converter",
KEY_CONVERTER_CLASS_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName()),
reportStage
);
}
private ConfigInfos validateValueConverterConfig(
Map<String, String> connectorConfig,
ConfigValue valueConverterConfigValue,
Function<String, TemporaryStage> reportStage
) {
return validateConverterConfig(
connectorConfig,
valueConverterConfigValue,
Converter.class,
Converter::config,
"value converter",
VALUE_CONVERTER_CLASS_CONFIG,
Collections.singletonMap(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()),
reportStage
);
}
@Override
public void validateConnectorConfig(Map<String, String> connectorProps, Callback<ConfigInfos> callback) {
validateConnectorConfig(connectorProps, callback, true);
@ -562,8 +724,25 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
configKeys.putAll(configDef.configKeys());
allGroups.addAll(configDef.groups());
configValues.addAll(config.configValues());
ConfigInfos configInfos = generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
// do custom converter-specific validation
ConfigInfos headerConverterConfigInfos = validateHeaderConverterConfig(
connectorProps,
validatedConnectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG),
reportStage
);
ConfigInfos keyConverterConfigInfos = validateKeyConverterConfig(
connectorProps,
validatedConnectorConfig.get(KEY_CONVERTER_CLASS_CONFIG),
reportStage
);
ConfigInfos valueConverterConfigInfos = validateValueConverterConfig(
connectorProps,
validatedConnectorConfig.get(VALUE_CONVERTER_CLASS_CONFIG),
reportStage
);
ConfigInfos configInfos = generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups));
AbstractConfig connectorConfig = new AbstractConfig(new ConfigDef(), connectorProps, doLog);
String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
ConfigInfos producerConfigInfos = null;
@ -612,7 +791,15 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
connectorClientConfigOverridePolicy);
}
}
return mergeConfigInfos(connType, configInfos, producerConfigInfos, consumerConfigInfos, adminConfigInfos);
return mergeConfigInfos(connType,
configInfos,
producerConfigInfos,
consumerConfigInfos,
adminConfigInfos,
headerConverterConfigInfos,
keyConverterConfigInfos,
valueConverterConfigInfos
);
}
}
@ -638,10 +825,6 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
org.apache.kafka.connect.health.ConnectorType connectorType,
ConnectorClientConfigRequest.ClientType clientType,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
int errorCount = 0;
List<ConfigInfo> configInfoList = new LinkedList<>();
Map<String, ConfigKey> configKeys = configDef.configKeys();
Set<String> groups = new LinkedHashSet<>();
Map<String, Object> clientConfigs = new HashMap<>();
for (Map.Entry<String, Object> rawClientConfig : connectorConfig.originalsWithPrefix(prefix).entrySet()) {
String configName = rawClientConfig.getKey();
@ -655,27 +838,38 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
connName, connectorType, connectorClass, clientConfigs, clientType);
List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
if (configValues != null) {
for (ConfigValue validatedConfigValue : configValues) {
ConfigKey configKey = configKeys.get(validatedConfigValue.name());
ConfigKeyInfo configKeyInfo = null;
if (configKey != null) {
if (configKey.group != null) {
groups.add(configKey.group);
}
configKeyInfo = convertConfigKey(configKey, prefix);
}
ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(),
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
if (!configValue.errorMessages().isEmpty()) {
errorCount++;
}
ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null);
configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo));
}
return prefixedConfigInfos(configDef.configKeys(), configValues, prefix);
}
private static ConfigInfos prefixedConfigInfos(Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, String prefix) {
int errorCount = 0;
Set<String> groups = new LinkedHashSet<>();
List<ConfigInfo> configInfos = new ArrayList<>();
if (configValues == null) {
return new ConfigInfos("", 0, new ArrayList<>(groups), configInfos);
}
return new ConfigInfos(connectorClass.toString(), errorCount, new ArrayList<>(groups), configInfoList);
for (ConfigValue validatedConfigValue : configValues) {
ConfigKey configKey = configKeys.get(validatedConfigValue.name());
ConfigKeyInfo configKeyInfo = null;
if (configKey != null) {
if (configKey.group != null) {
groups.add(configKey.group);
}
configKeyInfo = convertConfigKey(configKey, prefix);
}
ConfigValue configValue = new ConfigValue(prefix + validatedConfigValue.name(), validatedConfigValue.value(),
validatedConfigValue.recommendedValues(), validatedConfigValue.errorMessages());
if (configValue.errorMessages().size() > 0) {
errorCount++;
}
ConfigValueInfo configValueInfo = convertConfigValue(configValue, configKey != null ? configKey.type : null);
configInfos.add(new ConfigInfo(configKeyInfo, configValueInfo));
}
return new ConfigInfos("", errorCount, new ArrayList<>(groups), configInfos);
}
// public for testing

View File

@ -529,13 +529,13 @@ public class ConnectorConfig extends AbstractConfig {
}
Utils.ensureConcreteSubclass(baseClass, cls);
T transformation;
T pluginInstance;
try {
transformation = Utils.newInstance(cls, baseClass);
pluginInstance = Utils.newInstance(cls, baseClass);
} catch (Exception e) {
throw new ConfigException(key, String.valueOf(cls), "Error getting config definition from " + baseClass.getSimpleName() + ": " + e.getMessage());
}
ConfigDef configDef = config(transformation);
ConfigDef configDef = config(pluginInstance);
if (null == configDef) {
throw new ConnectException(
String.format(

View File

@ -324,7 +324,8 @@ public class ConnectorValidationIntegrationTest {
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when an abstract converter class is specified"
"Connector config should fail preflight validation when an abstract converter class is specified",
0
);
}
@ -336,7 +337,8 @@ public class ConnectorValidationIntegrationTest {
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a converter class with no suitable constructor is specified"
"Connector config should fail preflight validation when a converter class with no suitable constructor is specified",
0
);
}
@ -348,7 +350,35 @@ public class ConnectorValidationIntegrationTest {
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a converter class that throws an exception on instantiation is specified"
"Connector config should fail preflight validation when a converter class that throws an exception on instantiation is specified",
0
);
}
@Test
public void testConnectorHasMisconfiguredConverter() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithSinglePropertyConfigDef.class.getName());
config.put(KEY_CONVERTER_CLASS_CONFIG + "." + TestConverterWithSinglePropertyConfigDef.BOOLEAN_PROPERTY_NAME, "notaboolean");
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a converter fails custom validation",
0
);
}
@Test
public void testConnectorHasConverterWithNoConfigDef() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithNoConfigDef.class.getName());
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
0,
"Connector config should not fail preflight validation even when a converter provides a null ConfigDef",
0
);
}
@ -373,7 +403,8 @@ public class ConnectorValidationIntegrationTest {
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a header converter with a class of the wrong type is specified"
"Connector config should fail preflight validation when a header converter with a class of the wrong type is specified",
0
);
}
@ -385,7 +416,8 @@ public class ConnectorValidationIntegrationTest {
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when an abstract header converter class is specified"
"Connector config should fail preflight validation when an abstract header converter class is specified",
0
);
}
@ -397,7 +429,8 @@ public class ConnectorValidationIntegrationTest {
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a header converter class with no suitable constructor is specified"
"Connector config should fail preflight validation when a header converter class with no suitable constructor is specified",
0
);
}
@ -409,7 +442,35 @@ public class ConnectorValidationIntegrationTest {
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a header converter class that throws an exception on instantiation is specified"
"Connector config should fail preflight validation when a header converter class that throws an exception on instantiation is specified",
0
);
}
@Test
public void testConnectorHasMisconfiguredHeaderConverter() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithSinglePropertyConfigDef.class.getName());
config.put(HEADER_CONVERTER_CLASS_CONFIG + "." + TestConverterWithSinglePropertyConfigDef.BOOLEAN_PROPERTY_NAME, "notaboolean");
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
1,
"Connector config should fail preflight validation when a header converter fails custom validation",
0
);
}
@Test
public void testConnectorHasHeaderConverterWithNoConfigDef() throws InterruptedException {
Map<String, String> config = defaultSinkConnectorProps();
config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithNoConfigDef.class.getName());
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
config.get(CONNECTOR_CLASS_CONFIG),
config,
0,
"Connector config should not fail preflight validation even when a header converter provides a null ConfigDef",
0
);
}
@ -470,6 +531,21 @@ public class ConnectorValidationIntegrationTest {
}
}
public static class TestConverterWithSinglePropertyConfigDef extends TestConverter {
public static final String BOOLEAN_PROPERTY_NAME = "prop";
@Override
public ConfigDef config() {
return new ConfigDef().define(BOOLEAN_PROPERTY_NAME, ConfigDef.Type.BOOLEAN, ConfigDef.Importance.HIGH, "");
}
}
public static class TestConverterWithNoConfigDef extends TestConverter {
@Override
public ConfigDef config() {
return null;
}
}
private Map<String, String> defaultSourceConnectorProps() {
// setup up props for the source connector
Map<String, String> props = new HashMap<>();

View File

@ -18,3 +18,5 @@ org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter
org.apache.kafka.connect.runtime.isolation.PluginsTest$TestConverter
org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter
org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingConverter
org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest$TestConverterWithSinglePropertyConfigDef
org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest$TestConverterWithNoConfigDef

View File

@ -18,3 +18,5 @@ org.apache.kafka.connect.runtime.ErrorHandlingTaskTest$FaultyConverter
org.apache.kafka.connect.runtime.isolation.PluginsTest$TestHeaderConverter
org.apache.kafka.connect.runtime.isolation.PluginsTest$TestInternalConverter
org.apache.kafka.connect.runtime.isolation.PluginUtilsTest$CollidingHeaderConverter
org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest$TestConverterWithSinglePropertyConfigDef
org.apache.kafka.connect.integration.ConnectorValidationIntegrationTest$TestConverterWithNoConfigDef