From 683b8a2beeb5a75ef0a2ac878cf5a963f9b9d915 Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Tue, 30 Jul 2024 12:47:51 -0700 Subject: [PATCH] MINOR: update AdjustStreamThreadCountTest (#16696) Refactor test to move off deprecated `transform()` in favor of `process()`. Reviewers: Bill Bejeck --- .../AdjustStreamThreadCountTest.java | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index 16942856fb9..4feaa2c6283 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreamsWrapper; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.ThreadMetadata; @@ -30,9 +29,10 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThr import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.test.TestUtils; @@ -425,7 +425,6 @@ public class AdjustStreamThreadCountTest { } @Test - @SuppressWarnings("deprecation") public void shouldResizeCacheAfterThreadReplacement() throws InterruptedException { final long totalCacheBytes = 10L; final Properties props = new Properties(); @@ -437,26 +436,25 @@ public class AdjustStreamThreadCountTest { final StreamsBuilder builder = new StreamsBuilder(); final KStream stream = builder.stream(inputTopic); - stream.transform(() -> new Transformer>() { - @Override - public void init(final ProcessorContext context) { - context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> { - if (Thread.currentThread().getName().contains("StreamThread-1") && injectError.get()) { - injectError.set(false); - throw new RuntimeException("BOOM"); - } - }); - } + stream.process(() -> new Processor() { + ProcessorContext context; - @Override - public KeyValue transform(final String key, final String value) { - return new KeyValue<>(key, value); - } + @Override + public void init(final ProcessorContext context) { + this.context = context; + context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> { + if (Thread.currentThread().getName().contains("StreamThread-1") && injectError.get()) { + injectError.set(false); + throw new RuntimeException("BOOM"); + } + }); + } - @Override - public void close() { - } - }); + @Override + public void process(final Record record) { + context.forward(record); + } + }); try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props)) { addStreamStateChangeListener(kafkaStreams);