MINOR: update AdjustStreamThreadCountTest (#16696)

Refactor test to move off deprecated `transform()` in favor of
`process()`.

Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-07-30 12:47:51 -07:00 committed by GitHub
parent 2bf7be71ac
commit 683b8a2bee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 20 additions and 22 deletions

View File

@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.ThreadMetadata;
@ -30,9 +29,10 @@ import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThr
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.test.TestUtils;
@ -425,7 +425,6 @@ public class AdjustStreamThreadCountTest {
}
@Test
@SuppressWarnings("deprecation")
public void shouldResizeCacheAfterThreadReplacement() throws InterruptedException {
final long totalCacheBytes = 10L;
final Properties props = new Properties();
@ -437,9 +436,12 @@ public class AdjustStreamThreadCountTest {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> stream = builder.stream(inputTopic);
stream.transform(() -> new Transformer<String, String, KeyValue<String, String>>() {
stream.process(() -> new Processor<String, String, String, String>() {
ProcessorContext<String, String> context;
@Override
public void init(final ProcessorContext context) {
public void init(final ProcessorContext<String, String> context) {
this.context = context;
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
if (Thread.currentThread().getName().contains("StreamThread-1") && injectError.get()) {
injectError.set(false);
@ -449,12 +451,8 @@ public class AdjustStreamThreadCountTest {
}
@Override
public KeyValue<String, String> transform(final String key, final String value) {
return new KeyValue<>(key, value);
}
@Override
public void close() {
public void process(final Record<String, String> record) {
context.forward(record);
}
});