diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index 188c8a44e9d..5a4a16fa94f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -35,10 +35,11 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; import org.junit.jupiter.api.Test; @@ -125,25 +126,23 @@ public class StreamsGraphTest { final StreamsBuilder builder = new StreamsBuilder(); initializer = () -> ""; aggregator = (aggKey, value, aggregate) -> aggregate + value.length(); - final TransformerSupplier> transformSupplier = () -> new Transformer>() { - @Override - public void init(final ProcessorContext context) { + final ProcessorSupplier processorSupplier = + () -> new Processor() { + private ProcessorContext context; - } + @Override + public void init(final ProcessorContext context) { + this.context = context; + } - @Override - public KeyValue transform(final String key, final String value) { - return KeyValue.pair(key, value); - } - - @Override - public void close() { - - } - }; + @Override + public void process(final Record record) { + context.forward(record); + } + }; final KStream retryStream = builder.stream("retryTopic", Consumed.with(Serdes.String(), Serdes.String())) - .transform(transformSupplier) + .process(processorSupplier) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .aggregate(initializer, aggregator, @@ -415,22 +414,22 @@ public class StreamsGraphTest { private final String expectedComplexMergeOptimizeTopology = "Topologies:\n" + " Sub-topology: 0\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" + - " --> KSTREAM-TRANSFORM-0000000001\n" + - " Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n" + - " --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" + + " --> KSTREAM-PROCESSOR-0000000001\n" + + " Processor: KSTREAM-PROCESSOR-0000000001 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000005\n" + " <-- KSTREAM-SOURCE-0000000000\n" + - " Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter (stores: [])\n" + - " --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink\n" + - " <-- KSTREAM-TRANSFORM-0000000001\n" + - " Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" + - " <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" + + " Processor: KSTREAM-FILTER-0000000005 (stores: [])\n" + + " --> KSTREAM-SINK-0000000004\n" + + " <-- KSTREAM-PROCESSOR-0000000001\n" + + " Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" + + " <-- KSTREAM-FILTER-0000000005\n" + "\n" + " Sub-topology: 1\n" + - " Source: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" + + " Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" + " --> KSTREAM-AGGREGATE-0000000003\n" + " Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" + " --> KTABLE-SUPPRESS-0000000007\n" + - " <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source\n" + + " <-- KSTREAM-SOURCE-0000000006\n" + " Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n" + " --> KSTREAM-PEEK-0000000020\n" + " Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n" +