KAFKA-9673: Filter and Conditional SMTs (#8699)

Implemented KIP-585 to support Filter and Conditional SMTs. Added unit tests and integration tests.

Author: Tom Bentley <tbentley@redhat.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>
This commit is contained in:
Tom Bentley 2020-05-28 14:54:30 +01:00 committed by GitHub
parent 38c1e96d2c
commit 1c4eb1a575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1712 additions and 139 deletions

View File

@ -342,6 +342,18 @@ public final class Utils {
return Class.forName(klass, true, Utils.getContextOrKafkaClassLoader()).asSubclass(base);
}
/**
* Cast {@code klass} to {@code base} and instantiate it.
* @param klass The class to instantiate
* @param base A know baseclass of klass.
* @param <T> the type of the base class
* @throws ClassCastException If {@code klass} is not a subclass of {@code base}.
* @return the new instance.
*/
public static <T> T newInstance(Class<?> klass, Class<T> base) {
return Utils.newInstance(klass.asSubclass(base));
}
/**
* Construct a new object using a class name and parameters.
*

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.predicates;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
/**
* <p>A predicate on records.
* Predicates can be used to conditionally apply a {@link org.apache.kafka.connect.transforms.Transformation}
* by configuring the transformation's {@code predicate} (and {@code negate}) configuration parameters.
* In particular, the {@code Filter} transformation can be conditionally applied in order to filter
* certain records from further processing.
*
* <p>Implementations of this interface must be public and have a public constructor with no parameters.
*
* @param <R> The type of record.
*/
public interface Predicate<R extends ConnectRecord<R>> extends Configurable, AutoCloseable {
/**
* Configuration specification for this predicate.
*
* @return the configuration definition for this predicate; never null
*/
ConfigDef config();
/**
* Returns whether the given record satisfies this predicate.
*
* @param record the record to evaluate; may not be null
* @return true if the predicate matches, or false otherwise
*/
boolean test(R record);
@Override
void close();
}

View File

@ -22,12 +22,16 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
@ -38,6 +42,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -57,8 +62,11 @@ import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
* </p>
*/
public class ConnectorConfig extends AbstractConfig {
private static final Logger log = LoggerFactory.getLogger(ConnectorConfig.class);
protected static final String COMMON_GROUP = "Common";
protected static final String TRANSFORMS_GROUP = "Transforms";
protected static final String PREDICATES_GROUP = "Predicates";
protected static final String ERROR_GROUP = "Error Handling";
public static final String NAME_CONFIG = "name";
@ -98,6 +106,10 @@ public class ConnectorConfig extends AbstractConfig {
private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to records.";
private static final String TRANSFORMS_DISPLAY = "Transforms";
public static final String PREDICATES_CONFIG = "predicates";
private static final String PREDICATES_DOC = "Aliases for the predicates used by transformations.";
private static final String PREDICATES_DISPLAY = "Predicates";
public static final String CONFIG_RELOAD_ACTION_CONFIG = "config.action.reload";
private static final String CONFIG_RELOAD_ACTION_DOC =
"The action that Connect should take on the connector when changes in external " +
@ -147,6 +159,7 @@ public class ConnectorConfig extends AbstractConfig {
public static final String CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX = "producer.override.";
public static final String CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX = "consumer.override.";
public static final String CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX = "admin.override.";
public static final String PREDICATES_PREFIX = "predicates.";
private final EnrichedConnectorConfig enrichedConfig;
private static class EnrichedConnectorConfig extends AbstractConfig {
@ -170,21 +183,8 @@ public class ConnectorConfig extends AbstractConfig {
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.Validator() {
@SuppressWarnings("unchecked")
@Override
public void ensureValid(String name, Object value) {
final List<String> transformAliases = (List<String>) value;
if (transformAliases.size() > new HashSet<>(transformAliases).size()) {
throw new ConfigException(name, value, "Duplicate alias provided.");
}
}
@Override
public String toString() {
return "unique transformation aliases";
}
}), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
.define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, ++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
.define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART,
in(CONFIG_RELOAD_ACTION_NONE, CONFIG_RELOAD_ACTION_RESTART), Importance.LOW,
CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY)
@ -201,6 +201,24 @@ public class ConnectorConfig extends AbstractConfig {
ERRORS_LOG_INCLUDE_MESSAGES_DOC, ERROR_GROUP, ++orderInErrorGroup, Width.SHORT, ERRORS_LOG_INCLUDE_MESSAGES_DISPLAY);
}
private static ConfigDef.CompositeValidator aliasValidator(String kind) {
return ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.Validator() {
@SuppressWarnings("unchecked")
@Override
public void ensureValid(String name, Object value) {
final List<String> aliases = (List<String>) value;
if (aliases.size() > new HashSet<>(aliases).size()) {
throw new ConfigException(name, value, "Duplicate alias provided.");
}
}
@Override
public String toString() {
return "unique " + kind + " aliases";
}
});
}
public ConnectorConfig(Plugins plugins) {
this(plugins, new HashMap<String, String>());
}
@ -257,12 +275,23 @@ public class ConnectorConfig extends AbstractConfig {
final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
for (String alias : transformAliases) {
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
try {
@SuppressWarnings("unchecked")
final Transformation<R> transformation = getClass(prefix + "type").asSubclass(Transformation.class)
.getDeclaredConstructor().newInstance();
transformation.configure(originalsWithPrefix(prefix));
transformations.add(transformation);
final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
Map<String, Object> configs = originalsWithPrefix(prefix);
Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG);
transformation.configure(configs);
if (predicateAlias != null) {
String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
@SuppressWarnings("unchecked")
Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
predicate.configure(originalsWithPrefix(predicatePrefix));
transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
} else {
transformations.add(transformation);
}
} catch (Exception e) {
throw new ConnectException(e);
}
@ -276,116 +305,250 @@ public class ConnectorConfig extends AbstractConfig {
* <p>
* {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public static ConfigDef enrich(Plugins plugins, ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
Object transformAliases = ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
if (!(transformAliases instanceof List)) {
return baseConfigDef;
}
ConfigDef newDef = new ConfigDef(baseConfigDef);
LinkedHashSet<?> uniqueTransformAliases = new LinkedHashSet<>((List<?>) transformAliases);
for (Object o : uniqueTransformAliases) {
if (!(o instanceof String)) {
throw new ConfigException("Item in " + TRANSFORMS_CONFIG + " property is not of "
+ "type String");
}
String alias = (String) o;
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
final String group = TRANSFORMS_GROUP + ": " + alias;
int orderInGroup = 0;
final String transformationTypeConfig = prefix + "type";
final ConfigDef.Validator typeValidator = new ConfigDef.Validator() {
@Override
public void ensureValid(String name, Object value) {
getConfigDefFromTransformation(transformationTypeConfig, (Class) value);
}
};
newDef.define(transformationTypeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
"Class for the '" + alias + "' transformation.", group, orderInGroup++, Width.LONG, "Transformation type for " + alias,
Collections.<String>emptyList(), new TransformationClassRecommender(plugins));
final ConfigDef transformationConfigDef;
try {
final String className = props.get(transformationTypeConfig);
final Class<?> cls = (Class<?>) ConfigDef.parseType(transformationTypeConfig, className, Type.CLASS);
transformationConfigDef = getConfigDefFromTransformation(transformationTypeConfig, cls);
} catch (ConfigException e) {
if (requireFullConfig) {
throw e;
} else {
continue;
}
new EnrichablePlugin<Transformation<?>>("Transformation", TRANSFORMS_CONFIG, TRANSFORMS_GROUP, (Class) Transformation.class,
props, requireFullConfig) {
@SuppressWarnings("rawtypes")
@Override
protected Set<PluginDesc<Transformation<?>>> plugins() {
return (Set) plugins.transformations();
}
newDef.embed(prefix, group, orderInGroup, transformationConfigDef);
}
@Override
protected ConfigDef initialConfigDef() {
// All Transformations get these config parameters implicitly
return super.initialConfigDef()
.define(PredicatedTransformation.PREDICATE_CONFIG, Type.STRING, "", Importance.MEDIUM,
"The alias of a predicate used to determine whether to apply this transformation.")
.define(PredicatedTransformation.NEGATE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM,
"Whether the configured predicate should be negated.");
}
@Override
protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> configDefsForClass(String typeConfig) {
return super.configDefsForClass(typeConfig)
.filter(entry -> {
// The implicit parameters mask any from the transformer with the same name
if (PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getValue())
|| PredicatedTransformation.NEGATE_CONFIG.equals(entry.getValue())) {
log.warn("Transformer config {} is masked by implicit config of that name",
entry.getValue());
return false;
} else {
return true;
}
});
}
@Override
protected ConfigDef config(Transformation<?> transformation) {
return transformation.config();
}
@Override
protected void validateProps(String prefix) {
String prefixedNegate = prefix + PredicatedTransformation.NEGATE_CONFIG;
String prefixedPredicate = prefix + PredicatedTransformation.PREDICATE_CONFIG;
if (props.containsKey(prefixedNegate) &&
!props.containsKey(prefixedPredicate)) {
throw new ConfigException("Config '" + prefixedNegate + "' was provided " +
"but there is no config '" + prefixedPredicate + "' defining a predicate to be negated.");
}
}
}.enrich(newDef);
new EnrichablePlugin<Predicate<?>>("Predicate", PREDICATES_CONFIG, PREDICATES_GROUP,
(Class) Predicate.class, props, requireFullConfig) {
@Override
protected Set<PluginDesc<Predicate<?>>> plugins() {
return (Set) plugins.predicates();
}
@Override
protected ConfigDef config(Predicate<?> predicate) {
return predicate.config();
}
}.enrich(newDef);
return newDef;
}
/**
* Return {@link ConfigDef} from {@code transformationCls}, which is expected to be a non-null {@code Class<Transformation>},
* by instantiating it and invoking {@link Transformation#config()}.
* An abstraction over "enrichable plugins" ({@link Transformation}s and {@link Predicate}s) used for computing the
* contribution to a Connectors ConfigDef.
*
* This is not entirely elegant because
* although they basically use the same "alias prefix" configuration idiom there are some differences.
* The abstract method pattern is used to cope with this.
* @param <T> The type of plugin (either {@code Transformation} or {@code Predicate}).
*/
static ConfigDef getConfigDefFromTransformation(String key, Class<?> transformationCls) {
if (transformationCls == null || !Transformation.class.isAssignableFrom(transformationCls)) {
throw new ConfigException(key, String.valueOf(transformationCls), "Not a Transformation");
}
if (Modifier.isAbstract(transformationCls.getModifiers())) {
String childClassNames = Stream.of(transformationCls.getClasses())
.filter(transformationCls::isAssignableFrom)
.filter(c -> !Modifier.isAbstract(c.getModifiers()))
.filter(c -> Modifier.isPublic(c.getModifiers()))
.map(Class::getName)
.collect(Collectors.joining(", "));
String message = childClassNames.trim().isEmpty() ?
"Transformation is abstract and cannot be created." :
"Transformation is abstract and cannot be created. Did you mean " + childClassNames + "?";
throw new ConfigException(key, String.valueOf(transformationCls), message);
}
Transformation transformation;
try {
transformation = transformationCls.asSubclass(Transformation.class).getConstructor().newInstance();
} catch (Exception e) {
ConfigException exception = new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
exception.initCause(e);
throw exception;
}
ConfigDef configDef = transformation.config();
if (null == configDef) {
throw new ConnectException(
String.format(
"%s.config() must return a ConfigDef that is not null.",
transformationCls.getName()
)
);
}
return configDef;
}
static abstract class EnrichablePlugin<T> {
/**
* Recommend bundled transformations.
*/
static final class TransformationClassRecommender implements ConfigDef.Recommender {
private final Plugins plugins;
private final String aliasKind;
private final String aliasConfig;
private final String aliasGroup;
private final Class<T> baseClass;
private final Map<String, String> props;
private final boolean requireFullConfig;
TransformationClassRecommender(Plugins plugins) {
this.plugins = plugins;
public EnrichablePlugin(
String aliasKind,
String aliasConfig, String aliasGroup, Class<T> baseClass,
Map<String, String> props, boolean requireFullConfig) {
this.aliasKind = aliasKind;
this.aliasConfig = aliasConfig;
this.aliasGroup = aliasGroup;
this.baseClass = baseClass;
this.props = props;
this.requireFullConfig = requireFullConfig;
}
@Override
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
List<Object> transformationPlugins = new ArrayList<>();
for (PluginDesc<Transformation> plugin : plugins.transformations()) {
transformationPlugins.add(plugin.pluginClass());
/** Add the configs for this alias to the given {@code ConfigDef}. */
void enrich(ConfigDef newDef) {
Object aliases = ConfigDef.parseType(aliasConfig, props.get(aliasConfig), Type.LIST);
if (!(aliases instanceof List)) {
return;
}
LinkedHashSet<?> uniqueAliases = new LinkedHashSet<>((List<?>) aliases);
for (Object o : uniqueAliases) {
if (!(o instanceof String)) {
throw new ConfigException("Item in " + aliasConfig + " property is not of "
+ "type String");
}
String alias = (String) o;
final String prefix = aliasConfig + "." + alias + ".";
final String group = aliasGroup + ": " + alias;
int orderInGroup = 0;
final String typeConfig = prefix + "type";
final ConfigDef.Validator typeValidator = ConfigDef.LambdaValidator.with(
(String name, Object value) -> {
validateProps(prefix);
getConfigDefFromConfigProvidingClass(typeConfig, (Class<?>) value);
},
() -> "valid configs for " + alias + " " + aliasKind.toLowerCase(Locale.ENGLISH));
newDef.define(typeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
"Class for the '" + alias + "' " + aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
baseClass.getSimpleName() + " type for " + alias,
Collections.emptyList(), new ClassRecommender());
final ConfigDef configDef = populateConfigDef(typeConfig);
if (configDef == null) continue;
newDef.embed(prefix, group, orderInGroup, configDef);
}
return Collections.unmodifiableList(transformationPlugins);
}
@Override
public boolean visible(String name, Map<String, Object> parsedConfig) {
return true;
/** Subclasses can add extra validation of the {@link #props}. */
protected void validateProps(String prefix) { }
/**
* Populates the ConfigDef according to the configs returned from {@code configs()} method of class
* named in the {@code ...type} parameter of the {@code props}.
*/
protected ConfigDef populateConfigDef(String typeConfig) {
final ConfigDef configDef = initialConfigDef();
try {
configDefsForClass(typeConfig)
.forEach(entry -> configDef.define(entry.getValue()));
} catch (ConfigException e) {
if (requireFullConfig) {
throw e;
} else {
return null;
}
}
return configDef;
}
/**
* Return a stream of configs provided by the {@code configs()} method of class
* named in the {@code ...type} parameter of the {@code props}.
*/
protected Stream<Map.Entry<String, ConfigDef.ConfigKey>> configDefsForClass(String typeConfig) {
final Class<?> cls = (Class<?>) ConfigDef.parseType(typeConfig, props.get(typeConfig), Type.CLASS);
return getConfigDefFromConfigProvidingClass(typeConfig, cls)
.configKeys().entrySet().stream();
}
/** Get an initial ConfigDef */
protected ConfigDef initialConfigDef() {
return new ConfigDef();
}
/**
* Return {@link ConfigDef} from {@code cls}, which is expected to be a non-null {@code Class<T>},
* by instantiating it and invoking {@link #config(T)}.
* @param key
* @param cls The subclass of the baseclass.
*/
ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> cls) {
if (cls == null || !baseClass.isAssignableFrom(cls)) {
throw new ConfigException(key, String.valueOf(cls), "Not a " + baseClass.getSimpleName());
}
if (Modifier.isAbstract(cls.getModifiers())) {
String childClassNames = Stream.of(cls.getClasses())
.filter(cls::isAssignableFrom)
.filter(c -> !Modifier.isAbstract(c.getModifiers()))
.filter(c -> Modifier.isPublic(c.getModifiers()))
.map(Class::getName)
.collect(Collectors.joining(", "));
String message = childClassNames.trim().isEmpty() ?
aliasKind + " is abstract and cannot be created." :
aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?";
throw new ConfigException(key, String.valueOf(cls), message);
}
T transformation;
try {
transformation = Utils.newInstance(cls, baseClass);
} catch (Exception e) {
throw new ConfigException(key, String.valueOf(cls), "Error getting config definition from " + baseClass.getSimpleName() + ": " + e.getMessage());
}
ConfigDef configDef = config(transformation);
if (null == configDef) {
throw new ConnectException(
String.format(
"%s.config() must return a ConfigDef that is not null.",
cls.getName()
)
);
}
return configDef;
}
/**
* Get the ConfigDef from the given entity.
* This is necessary because there's no abstraction across {@link Transformation#config()} and
* {@link Predicate#config()}.
*/
protected abstract ConfigDef config(T t);
/**
* The transformation or predicate plugins (as appropriate for T) to be used
* for the {@link ClassRecommender}.
*/
protected abstract Set<PluginDesc<T>> plugins();
/**
* Recommend bundled transformations or predicates.
*/
final class ClassRecommender implements ConfigDef.Recommender {
@Override
public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
List<Object> result = new ArrayList<>();
for (PluginDesc<T> plugin : plugins()) {
result.add(plugin.pluginClass());
}
return Collections.unmodifiableList(result);
}
@Override
public boolean visible(String name, Map<String, Object> parsedConfig) {
return true;
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
/**
* Decorator for a {@link Transformation} which applies the delegate only when a
* {@link Predicate} is true (or false, according to {@code negate}).
* @param <R>
*/
class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
static final String PREDICATE_CONFIG = "predicate";
static final String NEGATE_CONFIG = "negate";
Predicate<R> predicate;
Transformation<R> delegate;
boolean negate;
PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) {
this.predicate = predicate;
this.negate = negate;
this.delegate = delegate;
}
@Override
public void configure(Map<String, ?> configs) {
throw new ConnectException(PredicatedTransformation.class.getName() + ".configure() " +
"should never be called directly.");
}
@Override
public R apply(R record) {
if (negate ^ predicate.test(record)) {
return delegate.apply(record);
}
return record;
}
@Override
public ConfigDef config() {
throw new ConnectException(PredicatedTransformation.class.getName() + ".config() " +
"should never be called directly.");
}
@Override
public void close() {
Utils.closeQuietly(delegate, "predicated");
Utils.closeQuietly(predicate, "predicate");
}
@Override
public String toString() {
return "PredicatedTransformation{" +
"predicate=" + predicate +
", delegate=" + delegate +
", negate=" + negate +
'}';
}
}

View File

@ -24,6 +24,7 @@ import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.reflections.Configuration;
import org.reflections.Reflections;
import org.reflections.ReflectionsException;
@ -72,6 +73,7 @@ public class DelegatingClassLoader extends URLClassLoader {
private final SortedSet<PluginDesc<Converter>> converters;
private final SortedSet<PluginDesc<HeaderConverter>> headerConverters;
private final SortedSet<PluginDesc<Transformation>> transformations;
private final SortedSet<PluginDesc<Predicate>> predicates;
private final SortedSet<PluginDesc<ConfigProvider>> configProviders;
private final SortedSet<PluginDesc<ConnectRestExtension>> restExtensions;
private final SortedSet<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
@ -92,6 +94,7 @@ public class DelegatingClassLoader extends URLClassLoader {
this.converters = new TreeSet<>();
this.headerConverters = new TreeSet<>();
this.transformations = new TreeSet<>();
this.predicates = new TreeSet<>();
this.configProviders = new TreeSet<>();
this.restExtensions = new TreeSet<>();
this.connectorClientConfigPolicies = new TreeSet<>();
@ -121,6 +124,10 @@ public class DelegatingClassLoader extends URLClassLoader {
return transformations;
}
public Set<PluginDesc<Predicate>> predicates() {
return predicates;
}
public Set<PluginDesc<ConfigProvider>> configProviders() {
return configProviders;
}
@ -269,6 +276,8 @@ public class DelegatingClassLoader extends URLClassLoader {
headerConverters.addAll(plugins.headerConverters());
addPlugins(plugins.transformations(), loader);
transformations.addAll(plugins.transformations());
addPlugins(plugins.predicates(), loader);
predicates.addAll(plugins.predicates());
addPlugins(plugins.configProviders(), loader);
configProviders.addAll(plugins.configProviders());
addPlugins(plugins.restExtensions(), loader);
@ -329,6 +338,7 @@ public class DelegatingClassLoader extends URLClassLoader {
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getPluginDesc(reflections, Transformation.class, loader),
getPluginDesc(reflections, Predicate.class, loader),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
@ -402,6 +412,7 @@ public class DelegatingClassLoader extends URLClassLoader {
addAliases(converters);
addAliases(headerConverters);
addAliases(transformations);
addAliases(predicates);
addAliases(restExtensions);
addAliases(connectorClientConfigPolicies);
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import java.util.Arrays;
import java.util.Collection;
@ -33,6 +34,7 @@ public class PluginScanResult {
private final Collection<PluginDesc<Converter>> converters;
private final Collection<PluginDesc<HeaderConverter>> headerConverters;
private final Collection<PluginDesc<Transformation>> transformations;
private final Collection<PluginDesc<Predicate>> predicates;
private final Collection<PluginDesc<ConfigProvider>> configProviders;
private final Collection<PluginDesc<ConnectRestExtension>> restExtensions;
private final Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies;
@ -44,6 +46,7 @@ public class PluginScanResult {
Collection<PluginDesc<Converter>> converters,
Collection<PluginDesc<HeaderConverter>> headerConverters,
Collection<PluginDesc<Transformation>> transformations,
Collection<PluginDesc<Predicate>> predicates,
Collection<PluginDesc<ConfigProvider>> configProviders,
Collection<PluginDesc<ConnectRestExtension>> restExtensions,
Collection<PluginDesc<ConnectorClientConfigOverridePolicy>> connectorClientConfigPolicies
@ -52,6 +55,7 @@ public class PluginScanResult {
this.converters = converters;
this.headerConverters = headerConverters;
this.transformations = transformations;
this.predicates = predicates;
this.configProviders = configProviders;
this.restExtensions = restExtensions;
this.connectorClientConfigPolicies = connectorClientConfigPolicies;
@ -76,6 +80,10 @@ public class PluginScanResult {
return transformations;
}
public Collection<PluginDesc<Predicate>> predicates() {
return predicates;
}
public Collection<PluginDesc<ConfigProvider>> configProviders() {
return configProviders;
}

View File

@ -128,7 +128,7 @@ public class PluginUtils {
// added to the WHITELIST), then this base interface or class needs to be excluded in the
// regular expression pattern
private static final Pattern WHITELIST = Pattern.compile("^org\\.apache\\.kafka\\.(?:connect\\.(?:"
+ "transforms\\.(?!Transformation$).*"
+ "transforms\\.(?!Transformation|predicates\\.Predicate$).*"
+ "|json\\..*"
+ "|file\\..*"
+ "|mirror\\..*"

View File

@ -33,6 +33,7 @@ import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -167,6 +168,10 @@ public class Plugins {
return delegatingLoader.transformations();
}
public Set<PluginDesc<Predicate>> predicates() {
return delegatingLoader.predicates();
}
public Set<PluginDesc<ConfigProvider>> configProviders() {
return delegatingLoader.configProviders();
}

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.tools;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.transforms.Cast;
import org.apache.kafka.connect.transforms.ExtractField;
import org.apache.kafka.connect.transforms.Filter;
import org.apache.kafka.connect.transforms.Flatten;
import org.apache.kafka.connect.transforms.HoistField;
import org.apache.kafka.connect.transforms.InsertField;
@ -60,7 +61,8 @@ public class TransformationDoc {
new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF),
new DocInfo(Flatten.class.getName(), Flatten.OVERVIEW_DOC, Flatten.CONFIG_DEF),
new DocInfo(Cast.class.getName(), Cast.OVERVIEW_DOC, Cast.CONFIG_DEF),
new DocInfo(TimestampConverter.class.getName(), TimestampConverter.OVERVIEW_DOC, TimestampConverter.CONFIG_DEF)
new DocInfo(TimestampConverter.class.getName(), TimestampConverter.OVERVIEW_DOC, TimestampConverter.CONFIG_DEF),
new DocInfo(Filter.class.getName(), Filter.OVERVIEW_DOC, Filter.CONFIG_DEF)
);
private static void printTransformationHtml(PrintStream out, DocInfo docInfo) {

View File

@ -17,6 +17,7 @@
package org.apache.kafka.connect.integration;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -26,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -57,7 +59,19 @@ public class ConnectorHandle {
* @return a non-null {@link TaskHandle}
*/
public TaskHandle taskHandle(String taskId) {
return taskHandles.computeIfAbsent(taskId, k -> new TaskHandle(this, taskId));
return taskHandle(taskId, null);
}
/**
* Get or create a task handle for a given task id. The task need not be created when this method is called. If the
* handle is called before the task is created, the task will bind to the handle once it starts (or restarts).
*
* @param taskId the task id
* @param consumer A callback invoked when a sink task processes a record.
* @return a non-null {@link TaskHandle}
*/
public TaskHandle taskHandle(String taskId, Consumer<SinkRecord> consumer) {
return taskHandles.computeIfAbsent(taskId, k -> new TaskHandle(this, taskId, consumer));
}
/**

View File

@ -125,7 +125,7 @@ public class MonitorableSinkConnector extends TestSinkConnector {
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord rec : records) {
taskHandle.record();
taskHandle.record(rec);
TopicPartition tp = cachedTopicPartitions
.computeIfAbsent(rec.topic(), v -> new HashMap<>())
.computeIfAbsent(rec.kafkaPartition(), v -> new TopicPartition(rec.topic(), rec.kafkaPartition()));

View File

@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.runtime.TestSourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@ -138,7 +139,9 @@ public class MonitorableSourceConnector extends TestSourceConnector {
Schema.STRING_SCHEMA,
"key-" + taskId + "-" + seqno,
Schema.STRING_SCHEMA,
"value-" + taskId + "-" + seqno))
"value-" + taskId + "-" + seqno,
null,
new ConnectHeaders().addLong("header-" + seqno, seqno)))
.collect(Collectors.toList());
}
return null;

View File

@ -17,12 +17,14 @@
package org.apache.kafka.connect.integration;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.IntStream;
/**
@ -37,22 +39,30 @@ public class TaskHandle {
private final ConnectorHandle connectorHandle;
private final AtomicInteger partitionsAssigned = new AtomicInteger(0);
private final StartAndStopCounter startAndStopCounter = new StartAndStopCounter();
private final Consumer<SinkRecord> consumer;
private CountDownLatch recordsRemainingLatch;
private CountDownLatch recordsToCommitLatch;
private int expectedRecords = -1;
private int expectedCommits = -1;
public TaskHandle(ConnectorHandle connectorHandle, String taskId) {
log.info("Created task {} for connector {}", taskId, connectorHandle);
public TaskHandle(ConnectorHandle connectorHandle, String taskId, Consumer<SinkRecord> consumer) {
this.taskId = taskId;
this.connectorHandle = connectorHandle;
this.consumer = consumer;
}
public void record() {
record(null);
}
/**
* Record a message arrival at the task and the connector overall.
*/
public void record() {
public void record(SinkRecord record) {
if (consumer != null && record != null) {
consumer.accept(record);
}
if (recordsRemainingLatch != null) {
recordsRemainingLatch.countDown();
}

View File

@ -0,0 +1,321 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.integration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Filter;
import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.PREDICATES_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* An integration test for connectors with transformations
*/
@Category(IntegrationTest.class)
public class TransformationIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(TransformationIntegrationTest.class);
private static final int NUM_RECORDS_PRODUCED = 2000;
private static final int NUM_TOPIC_PARTITIONS = 3;
private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
private static final long OBSERVED_RECORDS_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
private static final int NUM_TASKS = 1;
private static final int NUM_WORKERS = 3;
private static final String CONNECTOR_NAME = "simple-conn";
private static final String SINK_CONNECTOR_CLASS_NAME = MonitorableSinkConnector.class.getSimpleName();
private static final String SOURCE_CONNECTOR_CLASS_NAME = MonitorableSourceConnector.class.getSimpleName();
private EmbeddedConnectCluster connect;
private ConnectorHandle connectorHandle;
@Before
public void setup() {
// setup Connect worker properties
Map<String, String> exampleWorkerProps = new HashMap<>();
exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
// setup Kafka broker properties
Properties exampleBrokerProps = new Properties();
exampleBrokerProps.put("auto.create.topics.enable", "false");
// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
.numWorkers(NUM_WORKERS)
.numBrokers(1)
.workerProps(exampleWorkerProps)
.brokerProps(exampleBrokerProps)
.build();
// start the clusters
connect.start();
// get a handle to the connector
connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
}
@After
public void close() {
// delete connector handle
RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
// stop all Connect, Kafka and Zk threads.
connect.stop();
}
/**
* Test the {@link Filter} transformer with a
* {@link TopicNameMatches} predicate on a sink connector.
*/
@Test
public void testFilterOnTopicNameWithSinkConnector() throws Exception {
assertConnectReady();
Map<String, Long> observedRecords = observeRecords();
// create test topics
String fooTopic = "foo-topic";
String barTopic = "bar-topic";
int numFooRecords = NUM_RECORDS_PRODUCED;
int numBarRecords = NUM_RECORDS_PRODUCED;
connect.kafka().createTopic(fooTopic, NUM_TOPIC_PARTITIONS);
connect.kafka().createTopic(barTopic, NUM_TOPIC_PARTITIONS);
// setup up props for the sink connector
Map<String, String> props = new HashMap<>();
props.put("name", CONNECTOR_NAME);
props.put(CONNECTOR_CLASS_CONFIG, SINK_CONNECTOR_CLASS_NAME);
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPICS_CONFIG, String.join(",", fooTopic, barTopic));
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(TRANSFORMS_CONFIG, "filter");
props.put(TRANSFORMS_CONFIG + ".filter.type", Filter.class.getSimpleName());
props.put(TRANSFORMS_CONFIG + ".filter.predicate", "barPredicate");
props.put(PREDICATES_CONFIG, "barPredicate");
props.put(PREDICATES_CONFIG + ".barPredicate.type", TopicNameMatches.class.getSimpleName());
props.put(PREDICATES_CONFIG + ".barPredicate.pattern", "bar-.*");
// expect all records to be consumed by the connector
connectorHandle.expectedRecords(numFooRecords);
// expect all records to be consumed by the connector
connectorHandle.expectedCommits(numFooRecords);
// start a sink connector
connect.configureConnector(CONNECTOR_NAME, props);
assertConnectorRunning();
// produce some messages into source topic partitions
for (int i = 0; i < numBarRecords; i++) {
connect.kafka().produce(barTopic, i % NUM_TOPIC_PARTITIONS, "key", "simple-message-value-" + i);
}
for (int i = 0; i < numFooRecords; i++) {
connect.kafka().produce(fooTopic, i % NUM_TOPIC_PARTITIONS, "key", "simple-message-value-" + i);
}
// consume all records from the source topic or fail, to ensure that they were correctly produced.
assertEquals("Unexpected number of records consumed", numFooRecords,
connect.kafka().consume(numFooRecords, RECORD_TRANSFER_DURATION_MS, fooTopic).count());
assertEquals("Unexpected number of records consumed", numBarRecords,
connect.kafka().consume(numBarRecords, RECORD_TRANSFER_DURATION_MS, barTopic).count());
// wait for the connector tasks to consume all records.
connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
// wait for the connector tasks to commit all records.
connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
// Assert that we didn't see any baz
Map<String, Long> expectedRecordCounts = singletonMap(fooTopic, Long.valueOf(numFooRecords));
assertObservedRecords(observedRecords, expectedRecordCounts);
// delete connector
connect.deleteConnector(CONNECTOR_NAME);
}
private void assertConnectReady() throws InterruptedException {
connect.assertions().assertExactlyNumBrokersAreUp(1, "Brokers did not start in time.");
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS, "Worker did not start in time.");
log.info("Completed startup of {} Kafka brokers and {} Connect workers", 1, NUM_WORKERS);
}
private void assertConnectorRunning() throws InterruptedException {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
"Connector tasks did not start in time.");
}
private void assertObservedRecords(Map<String, Long> observedRecords, Map<String, Long> expectedRecordCounts) throws InterruptedException {
waitForCondition(() -> expectedRecordCounts.equals(observedRecords),
OBSERVED_RECORDS_DURATION_MS,
() -> "The observed records should be " + expectedRecordCounts + " but was " + observedRecords);
}
private Map<String, Long> observeRecords() {
Map<String, Long> observedRecords = new HashMap<>();
// record all the record we see
connectorHandle.taskHandle(CONNECTOR_NAME + "-0",
record -> observedRecords.compute(record.topic(),
(key, value) -> value == null ? 1 : value + 1));
return observedRecords;
}
/**
* Test the {@link Filter} transformer with a
* {@link RecordIsTombstone} predicate on a sink connector.
*/
@Test
public void testFilterOnTombstonesWithSinkConnector() throws Exception {
assertConnectReady();
Map<String, Long> observedRecords = observeRecords();
// create test topics
String topic = "foo-topic";
int numRecords = NUM_RECORDS_PRODUCED;
connect.kafka().createTopic(topic, NUM_TOPIC_PARTITIONS);
// setup up props for the sink connector
Map<String, String> props = new HashMap<>();
props.put("name", CONNECTOR_NAME);
props.put(CONNECTOR_CLASS_CONFIG, SINK_CONNECTOR_CLASS_NAME);
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put(TOPICS_CONFIG, String.join(",", topic));
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(TRANSFORMS_CONFIG, "filter");
props.put(TRANSFORMS_CONFIG + ".filter.type", Filter.class.getSimpleName());
props.put(TRANSFORMS_CONFIG + ".filter.predicate", "barPredicate");
props.put(PREDICATES_CONFIG, "barPredicate");
props.put(PREDICATES_CONFIG + ".barPredicate.type", RecordIsTombstone.class.getSimpleName());
// expect only half the records to be consumed by the connector
connectorHandle.expectedCommits(numRecords);
connectorHandle.expectedRecords(numRecords / 2);
// start a sink connector
connect.configureConnector(CONNECTOR_NAME, props);
assertConnectorRunning();
// produce some messages into source topic partitions
for (int i = 0; i < numRecords; i++) {
connect.kafka().produce(topic, i % NUM_TOPIC_PARTITIONS, "key", i % 2 == 0 ? "simple-message-value-" + i : null);
}
// consume all records from the source topic or fail, to ensure that they were correctly produced.
assertEquals("Unexpected number of records consumed", numRecords,
connect.kafka().consume(numRecords, RECORD_TRANSFER_DURATION_MS, topic).count());
// wait for the connector tasks to consume all records.
connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
// wait for the connector tasks to commit all records.
connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
Map<String, Long> expectedRecordCounts = singletonMap(topic, Long.valueOf(numRecords / 2));
assertObservedRecords(observedRecords, expectedRecordCounts);
// delete connector
connect.deleteConnector(CONNECTOR_NAME);
}
/**
* Test the {@link Filter} transformer with a
* {@link HasHeaderKey} predicate on a source connector.
*/
@Test
public void testFilterOnHasHeaderKeyWithSourceConnector() throws Exception {
assertConnectReady();
// create test topic
connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
// setup up props for the sink connector
Map<String, String> props = new HashMap<>();
props.put("name", CONNECTOR_NAME);
props.put(CONNECTOR_CLASS_CONFIG, SOURCE_CONNECTOR_CLASS_NAME);
props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
props.put("topic", "test-topic");
props.put("throughput", String.valueOf(500));
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(TRANSFORMS_CONFIG, "filter");
props.put(TRANSFORMS_CONFIG + ".filter.type", Filter.class.getSimpleName());
props.put(TRANSFORMS_CONFIG + ".filter.predicate", "headerPredicate");
props.put(TRANSFORMS_CONFIG + ".filter.negate", "true");
props.put(PREDICATES_CONFIG, "headerPredicate");
props.put(PREDICATES_CONFIG + ".headerPredicate.type", HasHeaderKey.class.getSimpleName());
props.put(PREDICATES_CONFIG + ".headerPredicate.name", "header-8");
// expect all records to be produced by the connector
connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
// expect all records to be produced by the connector
connectorHandle.expectedCommits(NUM_RECORDS_PRODUCED);
// validate the intended connector configuration, a valid config
connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(SOURCE_CONNECTOR_CLASS_NAME, props, 0,
"Validating connector configuration produced an unexpected number or errors.");
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
assertConnectorRunning();
// wait for the connector tasks to produce enough records
connectorHandle.awaitRecords(RECORD_TRANSFER_DURATION_MS);
// wait for the connector tasks to commit enough records
connectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
// consume all records from the source topic or fail, to ensure that they were correctly produced
for (ConsumerRecord<byte[], byte[]> record : connect.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "test-topic")) {
assertNotNull("Expected header to exist",
record.headers().lastHeader("header-8"));
}
// delete connector
connect.deleteConnector(CONNECTOR_NAME);
}
}

View File

@ -42,6 +42,7 @@ import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.Capture;
import org.easymock.EasyMock;
@ -291,11 +292,12 @@ public class AbstractHerderTest {
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
assertEquals(TestSourceConnector.class.getName(), result.name());
assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.ERROR_GROUP, SourceConnectorConfig.TOPIC_CREATION_GROUP), result.groups());
ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.ERROR_GROUP, SourceConnectorConfig.TOPIC_CREATION_GROUP), result.groups());
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(16, infos.size());
// Base connector config has 14 fields, connector's configs add 2
assertEquals(17, infos.size());
// Missing name should generate an error
assertEquals(ConnectorConfig.NAME_CONFIG,
infos.get(ConnectorConfig.NAME_CONFIG).configValue().name());
@ -351,6 +353,7 @@ public class AbstractHerderTest {
List<String> expectedGroups = Arrays.asList(
ConnectorConfig.COMMON_GROUP,
ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP,
"Transforms: xformA",
@ -360,7 +363,7 @@ public class AbstractHerderTest {
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(19, infos.size());
assertEquals(22, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name());
@ -373,6 +376,78 @@ public class AbstractHerderTest {
verifyAll();
}
@Test()
public void testConfigValidationPredicatesExtendResults() {
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
// 2 transform aliases defined -> 2 plugin lookups
Set<PluginDesc<Transformation>> transformations = new HashSet<>();
transformations.add(new PluginDesc<Transformation>(SampleTransformation.class, "1.0", classLoader));
EasyMock.expect(plugins.transformations()).andReturn(transformations).times(1);
Set<PluginDesc<Predicate>> predicates = new HashSet<>();
predicates.add(new PluginDesc<Predicate>(SamplePredicate.class, "1.0", classLoader));
EasyMock.expect(plugins.predicates()).andReturn(predicates).times(2);
replayAll();
// Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSourceConnector.class.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA");
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.predicate", "predX");
config.put(ConnectorConfig.PREDICATES_CONFIG, "predX,predY");
config.put(ConnectorConfig.PREDICATES_CONFIG + ".predX.type", SamplePredicate.class.getName());
config.put("required", "value"); // connector required config
ConfigInfos result = herder.validateConnectorConfig(config);
assertEquals(herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)), ConnectorType.SOURCE);
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
assertEquals(TestSourceConnector.class.getName(), result.name());
// Each transform also gets its own group
List<String> expectedGroups = Arrays.asList(
ConnectorConfig.COMMON_GROUP,
ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP,
"Transforms: xformA",
"Predicates: predX",
"Predicates: predY"
);
assertEquals(expectedGroups, result.groups());
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(24, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
assertEquals("transforms.xformA.type",
infos.get("transforms.xformA.type").configValue().name());
assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());
assertEquals("transforms.xformA.subconfig",
infos.get("transforms.xformA.subconfig").configValue().name());
assertEquals("transforms.xformA.predicate",
infos.get("transforms.xformA.predicate").configValue().name());
assertTrue(infos.get("transforms.xformA.predicate").configValue().errors().isEmpty());
assertEquals("transforms.xformA.negate",
infos.get("transforms.xformA.negate").configValue().name());
assertTrue(infos.get("transforms.xformA.negate").configValue().errors().isEmpty());
assertEquals("predicates.predX.type",
infos.get("predicates.predX.type").configValue().name());
assertEquals("predicates.predX.predconfig",
infos.get("predicates.predX.predconfig").configValue().name());
assertEquals("predicates.predY.type",
infos.get("predicates.predY.type").configValue().name());
assertFalse(
infos.get("predicates.predY.type").configValue().errors().isEmpty());
verifyAll();
}
@Test()
public void testConfigValidationPrincipalOnlyOverride() {
AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, new PrincipalConnectorClientConfigOverridePolicy());
@ -397,13 +472,14 @@ public class AbstractHerderTest {
List<String> expectedGroups = Arrays.asList(
ConnectorConfig.COMMON_GROUP,
ConnectorConfig.TRANSFORMS_GROUP,
ConnectorConfig.PREDICATES_GROUP,
ConnectorConfig.ERROR_GROUP,
SourceConnectorConfig.TOPIC_CREATION_GROUP
);
assertEquals(expectedGroups, result.groups());
assertEquals(1, result.errorCount());
// Base connector config has 13 fields, connector's configs add 2, and 2 producer overrides
assertEquals(18, result.values().size());
// Base connector config has 14 fields, connector's configs add 2, and 2 producer overrides
assertEquals(19, result.values().size());
assertTrue(result.values().stream().anyMatch(
configInfo -> ackConfigKey.equals(configInfo.configValue().name()) && !configInfo.configValue().errors().isEmpty()));
assertTrue(result.values().stream().anyMatch(
@ -564,6 +640,30 @@ public class AbstractHerderTest {
}
}
public static class SamplePredicate<R extends ConnectRecord<R>> implements Predicate<R> {
@Override
public ConfigDef config() {
return new ConfigDef()
.define("predconfig", ConfigDef.Type.STRING, "default", ConfigDef.Importance.LOW, "docs");
}
@Override
public boolean test(R record) {
return false;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
// We need to use a real class here due to some issue with mocking java.lang.Class
private abstract class BogusSourceConnector extends SourceConnector {
}

View File

@ -23,6 +23,7 @@ import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.junit.Test;
import java.util.Collections;
@ -32,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -81,41 +83,45 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
new ConnectorConfig(MOCK_PLUGINS, props);
}
@Test(expected = ConfigException.class)
@Test
public void danglingTransformAlias() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "dangler");
new ConnectorConfig(MOCK_PLUGINS, props);
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("Not a Transformation"));
}
@Test(expected = ConfigException.class)
@Test
public void emptyConnectorName() {
Map<String, String> props = new HashMap<>();
props.put("name", "");
props.put("connector.class", TestConnector.class.getName());
new ConnectorConfig(MOCK_PLUGINS, props);
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("String may not be empty"));
}
@Test(expected = ConfigException.class)
@Test
public void wrongTransformationType() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", "uninstantiable");
new ConnectorConfig(MOCK_PLUGINS, props);
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("Class uninstantiable could not be found"));
}
@Test(expected = ConfigException.class)
@Test
public void unconfiguredTransform() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", SimpleTransformation.class.getName());
new ConnectorConfig(MOCK_PLUGINS, props);
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("Missing required configuration \"transforms.a.magic.number\" which"));
}
@Test
@ -126,12 +132,8 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("transforms", "a");
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "40");
try {
new ConnectorConfig(MOCK_PLUGINS, props);
fail();
} catch (ConfigException e) {
assertTrue(e.getMessage().contains("Value must be at least 42"));
}
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("Value must be at least 42"));
}
@Test
@ -214,6 +216,189 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
}
}
@Test
public void wrongPredicateType() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
props.put("transforms.a.predicate", "my-pred");
props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestConnector.class.getName());
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("Not a Predicate"));
}
@Test
public void singleConditionalTransform() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
props.put("transforms.a.predicate", "my-pred");
props.put("transforms.a.negate", "true");
props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestPredicate.class.getName());
props.put("predicates.my-pred.int", "84");
assertPredicatedTransform(props, true);
}
@Test
public void predicateNegationDefaultsToFalse() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
props.put("transforms.a.predicate", "my-pred");
props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestPredicate.class.getName());
props.put("predicates.my-pred.int", "84");
assertPredicatedTransform(props, false);
}
@Test
public void abstractPredicate() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
props.put("transforms.a.predicate", "my-pred");
props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", AbstractTestPredicate.class.getName());
props.put("predicates.my-pred.int", "84");
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created"));
}
private void assertPredicatedTransform(Map<String, String> props, boolean expectedNegated) {
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
final List<Transformation<R>> transformations = config.transformations();
assertEquals(1, transformations.size());
assertTrue(transformations.get(0) instanceof PredicatedTransformation);
PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) transformations.get(0);
assertEquals(expectedNegated, predicated.negate);
assertTrue(predicated.delegate instanceof ConnectorConfigTest.SimpleTransformation);
assertEquals(42, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
assertTrue(predicated.predicate instanceof ConnectorConfigTest.TestPredicate);
assertEquals(84, ((TestPredicate<?>) predicated.predicate).param);
predicated.close();
assertEquals(0, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
assertEquals(0, ((TestPredicate<?>) predicated.predicate).param);
}
@Test
public void misconfiguredPredicate() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
props.put("transforms.a.predicate", "my-pred");
props.put("transforms.a.negate", "true");
props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestPredicate.class.getName());
props.put("predicates.my-pred.int", "79");
try {
new ConnectorConfig(MOCK_PLUGINS, props);
fail();
} catch (ConfigException e) {
assertTrue(e.getMessage().contains("Value must be at least 80"));
}
}
@Test
public void missingPredicateAliasProperty() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
props.put("transforms.a.predicate", "my-pred");
// technically not needed
//props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestPredicate.class.getName());
props.put("predicates.my-pred.int", "84");
new ConnectorConfig(MOCK_PLUGINS, props);
}
@Test
public void missingPredicateConfig() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
props.put("transforms.a.predicate", "my-pred");
props.put("predicates", "my-pred");
//props.put("predicates.my-pred.type", TestPredicate.class.getName());
//props.put("predicates.my-pred.int", "84");
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("Not a Predicate"));
}
@Test
public void negatedButNoPredicate() {
Map<String, String> props = new HashMap<>();
props.put("name", "test");
props.put("connector.class", TestConnector.class.getName());
props.put("transforms", "a");
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
props.put("transforms.a.negate", "true");
ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
assertTrue(e.getMessage().contains("there is no config 'transforms.a.predicate' defining a predicate to be negated"));
}
public static class TestPredicate<R extends ConnectRecord<R>> implements Predicate<R> {
int param;
public TestPredicate() { }
@Override
public ConfigDef config() {
return new ConfigDef().define("int", ConfigDef.Type.INT, 80, ConfigDef.Range.atLeast(80), ConfigDef.Importance.MEDIUM,
"A test parameter");
}
@Override
public boolean test(R record) {
return false;
}
@Override
public void close() {
param = 0;
}
@Override
public void configure(Map<String, ?> configs) {
param = Integer.parseInt((String) configs.get("int"));
}
}
public static abstract class AbstractTestPredicate<R extends ConnectRecord<R>> implements Predicate<R> {
public AbstractTestPredicate() { }
}
public static abstract class AbstractTransformation<R extends ConnectRecord<R>> implements Transformation<R> {
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.runtime;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.junit.Test;
import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class PredicatedTransformationTest {
private final SourceRecord initial = new SourceRecord(singletonMap("initial", 1), null, null, null, null);
private final SourceRecord transformed = new SourceRecord(singletonMap("transformed", 2), null, null, null, null);
@Test
public void apply() {
applyAndAssert(true, false, transformed);
applyAndAssert(true, true, initial);
applyAndAssert(false, false, initial);
applyAndAssert(false, true, transformed);
}
private void applyAndAssert(boolean predicateResult, boolean negate,
SourceRecord expectedResult) {
class TestTransformation implements Transformation<SourceRecord> {
private boolean closed = false;
private SourceRecord transformedRecord;
private TestTransformation(SourceRecord transformedRecord) {
this.transformedRecord = transformedRecord;
}
@Override
public SourceRecord apply(SourceRecord record) {
return transformedRecord;
}
@Override
public ConfigDef config() {
return null;
}
@Override
public void close() {
closed = true;
}
@Override
public void configure(Map<String, ?> configs) {
}
private void assertClosed() {
assertTrue("Transformer should be closed", closed);
}
}
class TestPredicate implements Predicate<SourceRecord> {
private boolean testResult;
private boolean closed = false;
private TestPredicate(boolean testResult) {
this.testResult = testResult;
}
@Override
public ConfigDef config() {
return null;
}
@Override
public boolean test(SourceRecord record) {
return testResult;
}
@Override
public void close() {
closed = true;
}
@Override
public void configure(Map<String, ?> configs) {
}
private void assertClosed() {
assertTrue("Predicate should be closed", closed);
}
}
TestPredicate predicate = new TestPredicate(predicateResult);
TestTransformation predicatedTransform = new TestTransformation(transformed);
PredicatedTransformation<SourceRecord> pt = new PredicatedTransformation<>(
predicate,
negate,
predicatedTransform);
assertEquals(expectedResult, pt.apply(initial));
pt.close();
predicate.assertClosed();
predicatedTransform.assertClosed();
}
}

View File

@ -99,6 +99,9 @@ public class PluginUtilsTest {
assertFalse(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.connect.transforms.Transformation")
);
assertFalse(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.connect.transforms.predicates.Predicate")
);
assertFalse(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.connect.storage.Converter")
);
@ -131,6 +134,10 @@ public class PluginUtilsTest {
assertTrue(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.connect.transforms.ExtractField$Key")
);
assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms.predicates."));
assertTrue(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.connect.transforms.predicates.TopicNameMatches")
);
assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json."));
assertTrue(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.connect.json.JsonConverter")

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
/**
* Drops all records, filtering them from subsequent transformations in the chain.
* This is intended to be used conditionally to filter out records matching (or not matching)
* a particular {@link org.apache.kafka.connect.transforms.predicates.Predicate}.
* @param <R> The type of record.
*/
public class Filter<R extends ConnectRecord<R>> implements Transformation<R> {
public static final String OVERVIEW_DOC = "Drops all records, filtering them from subsequent transformations in the chain. " +
"This is intended to be used conditionally to filter out records matching (or not matching) " +
"a particular Predicate.";
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public R apply(R record) {
return null;
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.predicates;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
/**
* A predicate which is true for records with at least one header with the configured name.
* @param <R> The type of connect record.
*/
public class HasHeaderKey<R extends ConnectRecord<R>> implements Predicate<R> {
private static final String NAME_CONFIG = "name";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"The header name.");
private String name;
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public boolean test(R record) {
Iterator<Header> headerIterator = record.headers().allWithName(name);
return headerIterator != null && headerIterator.hasNext();
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
this.name = new SimpleConfig(config(), configs).getString(NAME_CONFIG);
}
@Override
public String toString() {
return "HasHeaderKey{" +
"name='" + name + '\'' +
'}';
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.predicates;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
/**
* A predicate which is true for records which are tombstones (i.e. have null value).
* @param <R> The type of connect record.
*/
public class RecordIsTombstone<R extends ConnectRecord<R>> implements Predicate<R> {
private static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public boolean test(R record) {
return record.value() == null;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public String toString() {
return "RecordIsTombstone{}";
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.predicates;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
/**
* A predicate which is true for records with a topic name that matches the configured regular expression.
* @param <R> The type of connect record.
*/
public class TopicNameMatches<R extends ConnectRecord<R>> implements Predicate<R> {
private static final String PATTERN_CONFIG = "pattern";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(PATTERN_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.CompositeValidator.of(new ConfigDef.NonEmptyString(), new RegexValidator()),
ConfigDef.Importance.MEDIUM,
"A Java regular expression for matching against the name of a record's topic.");
private Pattern pattern;
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
@Override
public boolean test(R record) {
return record.topic() != null && pattern.matcher(record.topic()).matches();
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
SimpleConfig simpleConfig = new SimpleConfig(config(), configs);
Pattern result;
String value = simpleConfig.getString(PATTERN_CONFIG);
try {
result = Pattern.compile(value);
} catch (PatternSyntaxException e) {
throw new ConfigException(PATTERN_CONFIG, value, "entry must be a Java-compatible regular expression: " + e.getMessage());
}
this.pattern = result;
}
@Override
public String toString() {
return "TopicNameMatches{" +
"pattern=" + pattern +
'}';
}
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.predicates;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.junit.Test;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class HasHeaderKeyTest {
@Test
public void testNameRequiredInConfig() {
Map<String, String> props = new HashMap<>();
ConfigException e = assertThrows(ConfigException.class, () -> config(props));
assertTrue(e.getMessage().contains("Missing required configuration \"name\""));
}
@Test
public void testNameMayNotBeEmptyInConfig() {
Map<String, String> props = new HashMap<>();
props.put("name", "");
ConfigException e = assertThrows(ConfigException.class, () -> config(props));
assertTrue(e.getMessage().contains("String must be non-empty"));
}
@Test
public void testConfig() {
HasHeaderKey<SourceRecord> predicate = new HasHeaderKey<>();
predicate.config().validate(Collections.singletonMap("name", "foo"));
List<ConfigValue> configs = predicate.config().validate(Collections.singletonMap("name", ""));
assertEquals(singletonList("Invalid value for configuration name: String must be non-empty"), configs.get(0).errorMessages());
}
@Test
public void testTest() {
HasHeaderKey<SourceRecord> predicate = new HasHeaderKey<>();
predicate.configure(Collections.singletonMap("name", "foo"));
assertTrue(predicate.test(recordWithHeaders("foo")));
assertTrue(predicate.test(recordWithHeaders("foo", "bar")));
assertTrue(predicate.test(recordWithHeaders("bar", "foo", "bar", "foo")));
assertFalse(predicate.test(recordWithHeaders("bar")));
assertFalse(predicate.test(recordWithHeaders("bar", "bar")));
assertFalse(predicate.test(recordWithHeaders()));
assertFalse(predicate.test(new SourceRecord(null, null, null, null, null)));
}
private SimpleConfig config(Map<String, String> props) {
return new SimpleConfig(new HasHeaderKey().config(), props);
}
private SourceRecord recordWithHeaders(String... headers) {
return new SourceRecord(null, null, null, null, null, null, null, null, null,
Arrays.stream(headers).map(header -> new TestHeader(header)).collect(Collectors.toList()));
}
private static class TestHeader implements Header {
private final String key;
public TestHeader(String key) {
this.key = key;
}
@Override
public String key() {
return key;
}
@Override
public Schema schema() {
return null;
}
@Override
public Object value() {
return null;
}
@Override
public Header with(Schema schema, Object value) {
return null;
}
@Override
public Header rename(String key) {
return null;
}
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms.predicates;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class TopicNameMatchesTest {
@Test
public void testPatternRequiredInConfig() {
Map<String, String> props = new HashMap<>();
ConfigException e = assertThrows(ConfigException.class, () -> config(props));
assertTrue(e.getMessage().contains("Missing required configuration \"pattern\""));
}
@Test
public void testPatternMayNotBeEmptyInConfig() {
Map<String, String> props = new HashMap<>();
props.put("pattern", "");
ConfigException e = assertThrows(ConfigException.class, () -> config(props));
assertTrue(e.getMessage().contains("String must be non-empty"));
}
@Test
public void testPatternIsValidRegexInConfig() {
Map<String, String> props = new HashMap<>();
props.put("pattern", "[");
ConfigException e = assertThrows(ConfigException.class, () -> config(props));
assertTrue(e.getMessage().contains("Invalid regex"));
}
@Test
public void testConfig() {
TopicNameMatches<SourceRecord> predicate = new TopicNameMatches<>();
predicate.config().validate(Collections.singletonMap("pattern", "my-prefix-.*"));
List<ConfigValue> configs = predicate.config().validate(Collections.singletonMap("pattern", "*"));
List<String> errorMsgs = configs.get(0).errorMessages();
assertEquals(1, errorMsgs.size());
assertTrue(errorMsgs.get(0).contains("Invalid regex"));
}
@Test
public void testTest() {
TopicNameMatches<SourceRecord> predicate = new TopicNameMatches<>();
predicate.configure(Collections.singletonMap("pattern", "my-prefix-.*"));
assertTrue(predicate.test(recordWithTopicName("my-prefix-")));
assertTrue(predicate.test(recordWithTopicName("my-prefix-foo")));
assertFalse(predicate.test(recordWithTopicName("x-my-prefix-")));
assertFalse(predicate.test(recordWithTopicName("x-my-prefix-foo")));
assertFalse(predicate.test(recordWithTopicName("your-prefix-")));
assertFalse(predicate.test(recordWithTopicName("your-prefix-foo")));
assertFalse(predicate.test(new SourceRecord(null, null, null, null, null)));
}
private SimpleConfig config(Map<String, String> props) {
return new SimpleConfig(TopicNameMatches.CONFIG_DEF, props);
}
private SourceRecord recordWithTopicName(String topicName) {
return new SourceRecord(null, null, topicName, null, null);
}
}