MINOR: update StreamsGraphTest (#16741)

Refactor test to move off deprecated `transform()` in favor of `process()`.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-07-31 22:21:16 -07:00 committed by GitHub
parent f1ef7a5a9f
commit ed179c8ba7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 27 additions and 28 deletions

View File

@ -35,10 +35,11 @@ import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.junit.jupiter.api.Test;
@ -125,25 +126,23 @@ public class StreamsGraphTest {
final StreamsBuilder builder = new StreamsBuilder();
initializer = () -> "";
aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
final TransformerSupplier<String, String, KeyValue<String, String>> transformSupplier = () -> new Transformer<String, String, KeyValue<String, String>>() {
@Override
public void init(final ProcessorContext context) {
final ProcessorSupplier<String, String, String, String> processorSupplier =
() -> new Processor<String, String, String, String>() {
private ProcessorContext<String, String> context;
@Override
public void init(final ProcessorContext<String, String> context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(final String key, final String value) {
return KeyValue.pair(key, value);
}
@Override
public void close() {
public void process(final Record<String, String> record) {
context.forward(record);
}
};
final KStream<String, String> retryStream = builder.stream("retryTopic", Consumed.with(Serdes.String(), Serdes.String()))
.transform(transformSupplier)
.process(processorSupplier)
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(initializer,
aggregator,
@ -415,22 +414,22 @@ public class StreamsGraphTest {
private final String expectedComplexMergeOptimizeTopology = "Topologies:\n" +
" Sub-topology: 0\n" +
" Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" +
" --> KSTREAM-TRANSFORM-0000000001\n" +
" Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n" +
" --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" +
" --> KSTREAM-PROCESSOR-0000000001\n" +
" Processor: KSTREAM-PROCESSOR-0000000001 (stores: [])\n" +
" --> KSTREAM-FILTER-0000000005\n" +
" <-- KSTREAM-SOURCE-0000000000\n" +
" Processor: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter (stores: [])\n" +
" --> KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink\n" +
" <-- KSTREAM-TRANSFORM-0000000001\n" +
" Sink: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-sink (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" +
" <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-filter\n" +
" Processor: KSTREAM-FILTER-0000000005 (stores: [])\n" +
" --> KSTREAM-SINK-0000000004\n" +
" <-- KSTREAM-PROCESSOR-0000000001\n" +
" Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" +
" <-- KSTREAM-FILTER-0000000005\n" +
"\n" +
" Sub-topology: 1\n" +
" Source: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" +
" Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" +
" --> KSTREAM-AGGREGATE-0000000003\n" +
" Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
" --> KTABLE-SUPPRESS-0000000007\n" +
" <-- KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition-source\n" +
" <-- KSTREAM-SOURCE-0000000006\n" +
" Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n" +
" --> KSTREAM-PEEK-0000000020\n" +
" Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n" +