diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java index 087a439d084..8653f69fca6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java @@ -40,9 +40,9 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAssignmentListener; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -847,7 +847,6 @@ public class EosV2UpgradeIntegrationTest { } } - @SuppressWarnings("deprecation") private KafkaStreams getKafkaStreams(final String appDir, final String processingGuarantee, final boolean injectError) { @@ -861,62 +860,58 @@ public class EosV2UpgradeIntegrationTest { builder.addStateStore(storeBuilder); final KStream input = builder.stream(MULTI_PARTITION_INPUT_TOPIC); - input.transform(new TransformerSupplier>() { - @Override - public Transformer> get() { - return new Transformer>() { - ProcessorContext context; - KeyValueStore state = null; - AtomicBoolean crash; - AtomicInteger sharedCommit; + input.process(() -> new Processor() { + ProcessorContext context; + KeyValueStore state = null; + AtomicBoolean crash; + AtomicInteger sharedCommit; - @Override - public void init(final ProcessorContext context) { - this.context = context; - state = context.getStateStore(storeName); - final String clientId = context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString(); - if (APP_DIR_1.equals(clientId)) { - crash = errorInjectedClient1; - sharedCommit = commitCounterClient1; - } else { - crash = errorInjectedClient2; - sharedCommit = commitCounterClient2; + @Override + public void init(final ProcessorContext context) { + this.context = context; + state = context.getStateStore(storeName); + final String clientId = context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString(); + if (APP_DIR_1.equals(clientId)) { + crash = errorInjectedClient1; + sharedCommit = commitCounterClient1; + } else { + crash = errorInjectedClient2; + sharedCommit = commitCounterClient2; + } + } + + @Override + public void process(final Record record) { + final long key = record.key(); + final long value = record.value(); + + if ((value + 1) % 10 == 0) { + if (sharedCommit.get() < 0 || + sharedCommit.incrementAndGet() == 2) { + + context.commit(); } + commitRequested.incrementAndGet(); } - @Override - public KeyValue transform(final Long key, final Long value) { - if ((value + 1) % 10 == 0) { - if (sharedCommit.get() < 0 || - sharedCommit.incrementAndGet() == 2) { + Long sum = state.get(key); + if (sum == null) { + sum = value; + } else { + sum += value; + } + state.put(key, sum); + state.flush(); - context.commit(); - } - commitRequested.incrementAndGet(); - } - - Long sum = state.get(key); - if (sum == null) { - sum = value; - } else { - sum += value; - } - state.put(key, sum); - state.flush(); - - if (value % 10 == 4 && // potentially crash when processing 5th, 15th, or 25th record (etc.) - crash != null && crash.compareAndSet(true, false)) { - // only crash a single task - throw new RuntimeException("Injected test exception."); - } - - return new KeyValue<>(key, state.get(key)); + if (value % 10 == 4 && // potentially crash when processing 5th, 15th, or 25th record (etc.) + crash != null && crash.compareAndSet(true, false)) { + // only crash a single task + throw new RuntimeException("Injected test exception."); } - @Override - public void close() {} - }; - } }, storeNames) + context.forward(record.withValue(state.get(key))); + } + }, storeNames) .to(MULTI_PARTITION_OUTPUT_TOPIC); final Properties properties = new Properties();