KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation (#13184)

Reviewers: Christo Lolov <christololov@gmail.com>, Yash Mayya <yash.mayya@gmail.com>, Chris Egerton <chrise@aiven.io>
This commit is contained in:
Greg Harris 2023-02-28 08:23:19 -08:00 committed by GitHub
parent 8dd697b05f
commit f586fa59d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 113 additions and 118 deletions

View File

@ -268,12 +268,14 @@ public class ConnectorConfig extends AbstractConfig {
} }
/** /**
* Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}. * Returns the initialized list of {@link TransformationStage} which apply the
* {@link Transformation transformations} and {@link Predicate predicates}
* as they are specified in the {@link #TRANSFORMS_CONFIG} and {@link #PREDICATES_CONFIG}
*/ */
public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() { public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformationStages() {
final List<String> transformAliases = getList(TRANSFORMS_CONFIG); final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size()); final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size());
for (String alias : transformAliases) { for (String alias : transformAliases) {
final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
@ -281,17 +283,17 @@ public class ConnectorConfig extends AbstractConfig {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class); final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class);
Map<String, Object> configs = originalsWithPrefix(prefix); Map<String, Object> configs = originalsWithPrefix(prefix);
Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG); Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG);
Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG); Object negate = configs.remove(TransformationStage.NEGATE_CONFIG);
transformation.configure(configs); transformation.configure(configs);
if (predicateAlias != null) { if (predicateAlias != null) {
String predicatePrefix = PREDICATES_PREFIX + predicateAlias + "."; String predicatePrefix = PREDICATES_PREFIX + predicateAlias + ".";
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class); Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
predicate.configure(originalsWithPrefix(predicatePrefix)); predicate.configure(originalsWithPrefix(predicatePrefix));
transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation)); transformations.add(new TransformationStage<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation));
} else { } else {
transformations.add(transformation); transformations.add(new TransformationStage<>(transformation));
} }
} catch (Exception e) { } catch (Exception e) {
throw new ConnectException(e); throw new ConnectException(e);
@ -321,9 +323,9 @@ public class ConnectorConfig extends AbstractConfig {
protected ConfigDef initialConfigDef() { protected ConfigDef initialConfigDef() {
// All Transformations get these config parameters implicitly // All Transformations get these config parameters implicitly
return super.initialConfigDef() return super.initialConfigDef()
.define(PredicatedTransformation.PREDICATE_CONFIG, Type.STRING, "", Importance.MEDIUM, .define(TransformationStage.PREDICATE_CONFIG, Type.STRING, null, Importance.MEDIUM,
"The alias of a predicate used to determine whether to apply this transformation.") "The alias of a predicate used to determine whether to apply this transformation.")
.define(PredicatedTransformation.NEGATE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, .define(TransformationStage.NEGATE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM,
"Whether the configured predicate should be negated."); "Whether the configured predicate should be negated.");
} }
@ -332,8 +334,8 @@ public class ConnectorConfig extends AbstractConfig {
return super.configDefsForClass(typeConfig) return super.configDefsForClass(typeConfig)
.filter(entry -> { .filter(entry -> {
// The implicit parameters mask any from the transformer with the same name // The implicit parameters mask any from the transformer with the same name
if (PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getKey()) if (TransformationStage.PREDICATE_CONFIG.equals(entry.getKey())
|| PredicatedTransformation.NEGATE_CONFIG.equals(entry.getKey())) { || TransformationStage.NEGATE_CONFIG.equals(entry.getKey())) {
log.warn("Transformer config {} is masked by implicit config of that name", log.warn("Transformer config {} is masked by implicit config of that name",
entry.getKey()); entry.getKey());
return false; return false;
@ -350,8 +352,8 @@ public class ConnectorConfig extends AbstractConfig {
@Override @Override
protected void validateProps(String prefix) { protected void validateProps(String prefix) {
String prefixedNegate = prefix + PredicatedTransformation.NEGATE_CONFIG; String prefixedNegate = prefix + TransformationStage.NEGATE_CONFIG;
String prefixedPredicate = prefix + PredicatedTransformation.PREDICATE_CONFIG; String prefixedPredicate = prefix + TransformationStage.PREDICATE_CONFIG;
if (props.containsKey(prefixedNegate) && if (props.containsKey(prefixedNegate) &&
!props.containsKey(prefixedPredicate)) { !props.containsKey(prefixedPredicate)) {
throw new ConfigException("Config '" + prefixedNegate + "' was provided " + throw new ConfigException("Config '" + prefixedNegate + "' was provided " +

View File

@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,24 +33,24 @@ import java.util.StringJoiner;
public class TransformationChain<R extends ConnectRecord<R>> implements AutoCloseable { public class TransformationChain<R extends ConnectRecord<R>> implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(TransformationChain.class); private static final Logger log = LoggerFactory.getLogger(TransformationChain.class);
private final List<Transformation<R>> transformations; private final List<TransformationStage<R>> transformationStages;
private final RetryWithToleranceOperator retryWithToleranceOperator; private final RetryWithToleranceOperator retryWithToleranceOperator;
public TransformationChain(List<Transformation<R>> transformations, RetryWithToleranceOperator retryWithToleranceOperator) { public TransformationChain(List<TransformationStage<R>> transformationStages, RetryWithToleranceOperator retryWithToleranceOperator) {
this.transformations = transformations; this.transformationStages = transformationStages;
this.retryWithToleranceOperator = retryWithToleranceOperator; this.retryWithToleranceOperator = retryWithToleranceOperator;
} }
public R apply(R record) { public R apply(R record) {
if (transformations.isEmpty()) return record; if (transformationStages.isEmpty()) return record;
for (final Transformation<R> transformation : transformations) { for (final TransformationStage<R> transformationStage : transformationStages) {
final R current = record; final R current = record;
log.trace("Applying transformation {} to {}", log.trace("Applying transformation {} to {}",
transformation.getClass().getName(), record); transformationStage.transformClass().getName(), record);
// execute the operation // execute the operation
record = retryWithToleranceOperator.execute(() -> transformation.apply(current), Stage.TRANSFORMATION, transformation.getClass()); record = retryWithToleranceOperator.execute(() -> transformationStage.apply(current), Stage.TRANSFORMATION, transformationStage.transformClass());
if (record == null) break; if (record == null) break;
} }
@ -61,8 +60,8 @@ public class TransformationChain<R extends ConnectRecord<R>> implements AutoClos
@Override @Override
public void close() { public void close() {
for (Transformation<R> transformation : transformations) { for (TransformationStage<R> transformationStage : transformationStages) {
transformation.close(); transformationStage.close();
} }
} }
@ -71,18 +70,18 @@ public class TransformationChain<R extends ConnectRecord<R>> implements AutoClos
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
TransformationChain<?> that = (TransformationChain<?>) o; TransformationChain<?> that = (TransformationChain<?>) o;
return Objects.equals(transformations, that.transformations); return Objects.equals(transformationStages, that.transformationStages);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(transformations); return Objects.hash(transformationStages);
} }
public String toString() { public String toString() {
StringJoiner chain = new StringJoiner(", ", getClass().getName() + "{", "}"); StringJoiner chain = new StringJoiner(", ", getClass().getName() + "{", "}");
for (Transformation<R> transformation : transformations) { for (TransformationStage<R> transformationStage : transformationStages) {
chain.add(transformation.getClass().getName()); chain.add(transformationStage.transformClass().getName());
} }
return chain.toString(); return chain.toString();
} }

View File

@ -16,65 +16,60 @@
*/ */
package org.apache.kafka.connect.runtime; package org.apache.kafka.connect.runtime;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate; import org.apache.kafka.connect.transforms.predicates.Predicate;
/** /**
* Decorator for a {@link Transformation} which applies the delegate only when a * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate}
* {@link Predicate} is true (or false, according to {@code negate}). * which applies the transformation when the {@link Predicate} is true (or false, according to {@code negate}).
* If no {@link Predicate} is provided, the transformation will be unconditionally applied.
* @param <R> The type of record (must be an implementation of {@link ConnectRecord}) * @param <R> The type of record (must be an implementation of {@link ConnectRecord})
*/ */
public class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> { public class TransformationStage<R extends ConnectRecord<R>> implements AutoCloseable {
static final String PREDICATE_CONFIG = "predicate"; static final String PREDICATE_CONFIG = "predicate";
static final String NEGATE_CONFIG = "negate"; static final String NEGATE_CONFIG = "negate";
final Predicate<R> predicate; private final Predicate<R> predicate;
final Transformation<R> delegate; private final Transformation<R> transformation;
final boolean negate; private final boolean negate;
PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) { TransformationStage(Transformation<R> transformation) {
this(null, false, transformation);
}
TransformationStage(Predicate<R> predicate, boolean negate, Transformation<R> transformation) {
this.predicate = predicate; this.predicate = predicate;
this.negate = negate; this.negate = negate;
this.delegate = delegate; this.transformation = transformation;
} }
@Override public Class<? extends Transformation<R>> transformClass() {
public void configure(Map<String, ?> configs) { @SuppressWarnings("unchecked")
throw new ConnectException(PredicatedTransformation.class.getName() + ".configure() " + Class<? extends Transformation<R>> transformClass = (Class<? extends Transformation<R>>) transformation.getClass();
"should never be called directly."); return transformClass;
} }
@Override
public R apply(R record) { public R apply(R record) {
if (negate ^ predicate.test(record)) { if (predicate == null || negate ^ predicate.test(record)) {
return delegate.apply(record); return transformation.apply(record);
} }
return record; return record;
} }
@Override
public ConfigDef config() {
throw new ConnectException(PredicatedTransformation.class.getName() + ".config() " +
"should never be called directly.");
}
@Override @Override
public void close() { public void close() {
Utils.closeQuietly(delegate, "predicated transformation"); Utils.closeQuietly(transformation, "transformation");
Utils.closeQuietly(predicate, "predicate"); Utils.closeQuietly(predicate, "predicate");
} }
@Override @Override
public String toString() { public String toString() {
return "PredicatedTransformation{" + return "TransformationStage{" +
"predicate=" + predicate + "predicate=" + predicate +
", delegate=" + delegate + ", transformation=" + transformation +
", negate=" + negate + ", negate=" + negate +
'}'; '}';
} }

View File

@ -1253,7 +1253,7 @@ public class Worker {
Class<? extends Connector> connectorClass, Class<? extends Connector> connectorClass,
RetryWithToleranceOperator retryWithToleranceOperator) { RetryWithToleranceOperator retryWithToleranceOperator) {
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator); TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformationStages(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain); log.info("Initializing: {}", transformationChain);
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings());
retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
@ -1297,7 +1297,7 @@ public class Worker {
SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
connectorConfig.originalsStrings(), config.topicCreationEnable()); connectorConfig.originalsStrings(), config.topicCreationEnable());
retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain); log.info("Initializing: {}", transformationChain);
Map<String, Object> producerProps = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, Map<String, Object> producerProps = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass,
@ -1365,7 +1365,7 @@ public class Worker {
SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
connectorConfig.originalsStrings(), config.topicCreationEnable()); connectorConfig.originalsStrings(), config.topicCreationEnable());
retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain); log.info("Initializing: {}", transformationChain);
Map<String, Object> producerProps = exactlyOnceSourceTaskProducerConfigs( Map<String, Object> producerProps = exactlyOnceSourceTaskProducerConfigs(

View File

@ -20,7 +20,6 @@ import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.Parameter;
import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.PredicatedTransformation;
import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@ -34,7 +33,6 @@ import org.apache.kafka.connect.tools.MockSourceConnector;
import org.apache.kafka.connect.tools.SchemaSourceConnector; import org.apache.kafka.connect.tools.SchemaSourceConnector;
import org.apache.kafka.connect.tools.VerifiableSinkConnector; import org.apache.kafka.connect.tools.VerifiableSinkConnector;
import org.apache.kafka.connect.tools.VerifiableSourceConnector; import org.apache.kafka.connect.tools.VerifiableSourceConnector;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.FutureCallback; import org.apache.kafka.connect.util.FutureCallback;
import javax.ws.rs.BadRequestException; import javax.ws.rs.BadRequestException;
@ -79,11 +77,6 @@ public class ConnectorPluginsResource implements ConnectResource {
SchemaSourceConnector.class SchemaSourceConnector.class
); );
@SuppressWarnings({"unchecked", "rawtypes"})
static final List<Class<? extends Transformation<?>>> TRANSFORM_EXCLUDES = Collections.singletonList(
(Class) PredicatedTransformation.class
);
public ConnectorPluginsResource(Herder herder) { public ConnectorPluginsResource(Herder herder) {
this.herder = herder; this.herder = herder;
this.connectorPlugins = new ArrayList<>(); this.connectorPlugins = new ArrayList<>();
@ -92,7 +85,7 @@ public class ConnectorPluginsResource implements ConnectResource {
// TODO: improve once plugins are allowed to be added/removed during runtime. // TODO: improve once plugins are allowed to be added/removed during runtime.
addConnectorPlugins(herder.plugins().sinkConnectors(), SINK_CONNECTOR_EXCLUDES); addConnectorPlugins(herder.plugins().sinkConnectors(), SINK_CONNECTOR_EXCLUDES);
addConnectorPlugins(herder.plugins().sourceConnectors(), SOURCE_CONNECTOR_EXCLUDES); addConnectorPlugins(herder.plugins().sourceConnectors(), SOURCE_CONNECTOR_EXCLUDES);
addConnectorPlugins(herder.plugins().transformations(), TRANSFORM_EXCLUDES); addConnectorPlugins(herder.plugins().transformations(), Collections.emptySet());
addConnectorPlugins(herder.plugins().predicates(), Collections.emptySet()); addConnectorPlugins(herder.plugins().predicates(), Collections.emptySet());
addConnectorPlugins(herder.plugins().converters(), Collections.emptySet()); addConnectorPlugins(herder.plugins().converters(), Collections.emptySet());
addConnectorPlugins(herder.plugins().headerConverters(), Collections.emptySet()); addConnectorPlugins(herder.plugins().headerConverters(), Collections.emptySet());

View File

@ -22,6 +22,7 @@ import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate; import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.junit.Test; import org.junit.Test;
@ -48,6 +49,8 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
} }
}; };
private static final SinkRecord DUMMY_RECORD = new SinkRecord(null, 0, null, null, null, null, 0L);
public static abstract class TestConnector extends Connector { public static abstract class TestConnector extends Connector {
} }
@ -62,7 +65,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
@Override @Override
public R apply(R record) { public R apply(R record) {
return null; return record.newRecord(null, magicNumber, null, null, null, null, 0L);
} }
@Override @Override
@ -147,10 +150,11 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("transforms.a.type", SimpleTransformation.class.getName()); props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42"); props.put("transforms.a.magic.number", "42");
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
final List<Transformation<R>> transformations = config.transformations(); final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages();
assertEquals(1, transformations.size()); assertEquals(1, transformationStages.size());
final SimpleTransformation<R> xform = (SimpleTransformation<R>) transformations.get(0); final TransformationStage<SinkRecord> stage = transformationStages.get(0);
assertEquals(42, xform.magicNumber); assertEquals(SimpleTransformation.class, stage.transformClass());
assertEquals(42, stage.apply(DUMMY_RECORD).kafkaPartition().intValue());
} }
@Test @Test
@ -175,10 +179,10 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("transforms.b.type", SimpleTransformation.class.getName()); props.put("transforms.b.type", SimpleTransformation.class.getName());
props.put("transforms.b.magic.number", "84"); props.put("transforms.b.magic.number", "84");
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
final List<Transformation<R>> transformations = config.transformations(); final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages();
assertEquals(2, transformations.size()); assertEquals(2, transformationStages.size());
assertEquals(42, ((SimpleTransformation<R>) transformations.get(0)).magicNumber); assertEquals(42, transformationStages.get(0).apply(DUMMY_RECORD).kafkaPartition().intValue());
assertEquals(84, ((SimpleTransformation<R>) transformations.get(1)).magicNumber); assertEquals(84, transformationStages.get(1).apply(DUMMY_RECORD).kafkaPartition().intValue());
} }
@Test @Test
@ -246,7 +250,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("predicates", "my-pred"); props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestPredicate.class.getName()); props.put("predicates.my-pred.type", TestPredicate.class.getName());
props.put("predicates.my-pred.int", "84"); props.put("predicates.my-pred.int", "84");
assertPredicatedTransform(props, true); assertTransformationStageWithPredicate(props, true);
} }
@Test @Test
@ -261,7 +265,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put("predicates", "my-pred"); props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestPredicate.class.getName()); props.put("predicates.my-pred.type", TestPredicate.class.getName());
props.put("predicates.my-pred.int", "84"); props.put("predicates.my-pred.int", "84");
assertPredicatedTransform(props, false); assertTransformationStageWithPredicate(props, false);
} }
@Test @Test
@ -280,25 +284,19 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created")); assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created"));
} }
private void assertPredicatedTransform(Map<String, String> props, boolean expectedNegated) { private void assertTransformationStageWithPredicate(Map<String, String> props, boolean expectedNegated) {
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props);
final List<Transformation<R>> transformations = config.transformations(); final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages();
assertEquals(1, transformations.size()); assertEquals(1, transformationStages.size());
assertTrue(transformations.get(0) instanceof PredicatedTransformation); TransformationStage<SinkRecord> stage = transformationStages.get(0);
PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) transformations.get(0);
assertEquals(expectedNegated, predicated.negate); assertEquals(expectedNegated ? 42 : 0, stage.apply(DUMMY_RECORD).kafkaPartition().intValue());
assertTrue(predicated.delegate instanceof ConnectorConfigTest.SimpleTransformation); SinkRecord matchingRecord = DUMMY_RECORD.newRecord(null, 84, null, null, null, null, 0L);
assertEquals(42, ((SimpleTransformation<?>) predicated.delegate).magicNumber); assertEquals(expectedNegated ? 84 : 42, stage.apply(matchingRecord).kafkaPartition().intValue());
assertEquals(SimpleTransformation.class, stage.transformClass());
assertTrue(predicated.predicate instanceof ConnectorConfigTest.TestPredicate); stage.close();
assertEquals(84, ((TestPredicate<?>) predicated.predicate).param);
predicated.close();
assertEquals(0, ((SimpleTransformation<?>) predicated.delegate).magicNumber);
assertEquals(0, ((TestPredicate<?>) predicated.predicate).param);
} }
@Test @Test
@ -381,7 +379,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
@Override @Override
public boolean test(R record) { public boolean test(R record) {
return false; return record.kafkaPartition() == param;
} }
@Override @Override
@ -445,8 +443,8 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName()); props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName());
ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), props, false); ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), props, false);
assertEnrichedConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN); assertEnrichedConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
assertEnrichedConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING); assertEnrichedConfigDef(def, prefix, TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.STRING);
assertEnrichedConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN); assertEnrichedConfigDef(def, prefix, TransformationStage.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
} }
private static void assertEnrichedConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) { private static void assertEnrichedConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) {
@ -460,9 +458,9 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
private static final String MUST_EXIST_KEY = "must.exist.key"; private static final String MUST_EXIST_KEY = "must.exist.key";
private static final ConfigDef CONFIG_DEF = new ConfigDef() private static final ConfigDef CONFIG_DEF = new ConfigDef()
// this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error. // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
.define(PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "fake") .define(TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "fake")
// this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error. // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error.
.define(PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM, "fake") .define(TransformationStage.NEGATE_CONFIG, ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM, "fake")
// this configDef should appear if above duplicate configDef is removed without any error // this configDef should appear if above duplicate configDef is removed without any error
.define(MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, "this key must exist"); .define(MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, "this key must exist");

View File

@ -502,7 +502,7 @@ public class ErrorHandlingTaskTest {
converter.configure(oo); converter.configure(oo);
TransformationChain<SinkRecord> sinkTransforms = TransformationChain<SinkRecord> sinkTransforms =
new TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator); new TransformationChain<>(singletonList(new TransformationStage<>(new FaultyPassthrough<SinkRecord>())), retryWithToleranceOperator);
workerSinkTask = new WorkerSinkTask( workerSinkTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig, taskId, sinkTask, statusListener, initialState, workerConfig,
@ -532,7 +532,7 @@ public class ErrorHandlingTaskTest {
} }
private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) { private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()), retryWithToleranceOperator); TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new TransformationStage<>(new FaultyPassthrough<SourceRecord>())), retryWithToleranceOperator);
workerSourceTask = spy(new WorkerSourceTask( workerSourceTask = spy(new WorkerSourceTask(
taskId, sourceTask, statusListener, initialState, converter, taskId, sourceTask, statusListener, initialState, converter,

View File

@ -17,13 +17,18 @@
package org.apache.kafka.connect.runtime; package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.junit.Test; import org.junit.Test;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class PredicatedTransformationTest { public class TransformationStageTest {
private final SourceRecord initial = new SourceRecord(singletonMap("initial", 1), null, null, null, null); private final SourceRecord initial = new SourceRecord(singletonMap("initial", 1), null, null, null, null);
private final SourceRecord transformed = new SourceRecord(singletonMap("transformed", 2), null, null, null, null); private final SourceRecord transformed = new SourceRecord(singletonMap("transformed", 2), null, null, null, null);
@ -39,17 +44,21 @@ public class PredicatedTransformationTest {
private void applyAndAssert(boolean predicateResult, boolean negate, private void applyAndAssert(boolean predicateResult, boolean negate,
SourceRecord expectedResult) { SourceRecord expectedResult) {
SamplePredicate predicate = new SamplePredicate(predicateResult); @SuppressWarnings("unchecked")
SampleTransformation<SourceRecord> predicatedTransform = new SampleTransformation<>(transformed); Predicate<SourceRecord> predicate = mock(Predicate.class);
PredicatedTransformation<SourceRecord> pt = new PredicatedTransformation<>( when(predicate.test(any())).thenReturn(predicateResult);
@SuppressWarnings("unchecked")
Transformation<SourceRecord> transformation = mock(Transformation.class);
when(transformation.apply(any())).thenReturn(transformed);
TransformationStage<SourceRecord> stage = new TransformationStage<>(
predicate, predicate,
negate, negate,
predicatedTransform); transformation);
assertEquals(expectedResult, pt.apply(initial)); assertEquals(expectedResult, stage.apply(initial));
pt.close(); stage.close();
assertTrue(predicate.closed); verify(predicate).close();
assertTrue(predicatedTransform.closed); verify(transformation).close();
} }
} }

View File

@ -387,8 +387,7 @@ public class ConnectorPluginsResourceTest {
public void testListAllPlugins() { public void testListAllPlugins() {
Set<Class<?>> excludes = Stream.of( Set<Class<?>> excludes = Stream.of(
ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES, ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES,
ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES, ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES
ConnectorPluginsResource.TRANSFORM_EXCLUDES
).flatMap(Collection::stream) ).flatMap(Collection::stream)
.collect(Collectors.toSet()); .collect(Collectors.toSet());
Set<PluginInfo> expectedConnectorPlugins = Stream.of( Set<PluginInfo> expectedConnectorPlugins = Stream.of(

View File

@ -19,6 +19,7 @@ package org.apache.kafka.connect.util;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.runtime.TransformationStage;
import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
@ -26,7 +27,6 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Cast; import org.apache.kafka.connect.transforms.Cast;
import org.apache.kafka.connect.transforms.RegexRouter; import org.apache.kafka.connect.transforms.RegexRouter;
import org.apache.kafka.connect.transforms.Transformation;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -516,9 +516,9 @@ public class TopicCreationTest {
topicCreation.addTopic(FOO_TOPIC); topicCreation.addTopic(FOO_TOPIC);
assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC)); assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC));
List<Transformation<SourceRecord>> transformations = sourceConfig.transformations(); List<TransformationStage<SourceRecord>> transformationStages = sourceConfig.transformationStages();
assertEquals(1, transformations.size()); assertEquals(1, transformationStages.size());
Cast<SourceRecord> xform = (Cast<SourceRecord>) transformations.get(0); TransformationStage<SourceRecord> xform = transformationStages.get(0);
SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42)); SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42));
assertEquals(Schema.Type.INT8, transformed.valueSchema().type()); assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
assertEquals((byte) 42, transformed.value()); assertEquals((byte) 42, transformed.value());
@ -623,15 +623,15 @@ public class TopicCreationTest {
assertEquals(barPartitions, barTopicSpec.numPartitions()); assertEquals(barPartitions, barTopicSpec.numPartitions());
assertThat(barTopicSpec.configs(), is(barTopicProps)); assertThat(barTopicSpec.configs(), is(barTopicProps));
List<Transformation<SourceRecord>> transformations = sourceConfig.transformations(); List<TransformationStage<SourceRecord>> transformationStages = sourceConfig.transformationStages();
assertEquals(2, transformations.size()); assertEquals(2, transformationStages.size());
Cast<SourceRecord> castXForm = (Cast<SourceRecord>) transformations.get(0); TransformationStage<SourceRecord> castXForm = transformationStages.get(0);
SourceRecord transformed = castXForm.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42)); SourceRecord transformed = castXForm.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42));
assertEquals(Schema.Type.INT8, transformed.valueSchema().type()); assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
assertEquals((byte) 42, transformed.value()); assertEquals((byte) 42, transformed.value());
RegexRouter<SourceRecord> regexRouterXForm = (RegexRouter<SourceRecord>) transformations.get(1); TransformationStage<SourceRecord> regexRouterXForm = transformationStages.get(1);
transformed = regexRouterXForm.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42)); transformed = regexRouterXForm.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42));
assertEquals("prefix-topic", transformed.topic()); assertEquals("prefix-topic", transformed.topic());
} }