diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 07293d2d4b9..6a055f51cf9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -32,11 +32,12 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; -import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; @@ -102,7 +103,7 @@ public class CachingPersistentWindowStoreTest { private static final String TOPIC = "topic"; private static final String CACHE_NAMESPACE = "0_0-store-name"; - private InternalMockProcessorContext context; + private InternalMockProcessorContext context; private RocksDBSegmentedBytesStore bytesStore; private WindowStore underlyingStore; private CachingWindowStore cachingStore; @@ -138,8 +139,8 @@ public class CachingPersistentWindowStoreTest { final WindowStore inner = mock(WindowStore.class); final CachingWindowStore outer = new CachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); when(inner.name()).thenReturn("store"); - outer.init((ProcessorContext) context, outer); - verify(inner).init((ProcessorContext) context, outer); + outer.init((org.apache.kafka.streams.processor.ProcessorContext) context, outer); + verify(inner).init((org.apache.kafka.streams.processor.ProcessorContext) context, outer); } @SuppressWarnings("unchecked") @@ -153,30 +154,28 @@ public class CachingPersistentWindowStoreTest { } @Test - @SuppressWarnings("deprecation") public void shouldNotReturnDuplicatesInRanges() { final StreamsBuilder builder = new StreamsBuilder(); final StoreBuilder> storeBuilder = Stores.windowStoreBuilder( - Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false), - Serdes.String(), - Serdes.String()) + Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false), + Serdes.String(), + Serdes.String()) .withCachingEnabled(); builder.addStateStore(storeBuilder); builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())) - .transform(() -> new Transformer>() { + .process(() -> new Processor() { private WindowStore store; private int numRecordsProcessed; - private ProcessorContext context; + private ProcessorContext context; - @SuppressWarnings("unchecked") @Override - public void init(final ProcessorContext processorContext) { + public void init(final ProcessorContext processorContext) { this.context = processorContext; - this.store = (WindowStore) processorContext.getStateStore("store-name"); + this.store = processorContext.getStateStore("store-name"); int count = 0; try (final KeyValueIterator, String> all = store.all()) { @@ -190,7 +189,7 @@ public class CachingPersistentWindowStoreTest { } @Override - public KeyValue transform(final String key, final String value) { + public void process(final Record record) { int count = 0; try (final KeyValueIterator, String> all = store.all()) { @@ -202,22 +201,18 @@ public class CachingPersistentWindowStoreTest { assertThat(count, equalTo(numRecordsProcessed)); - store.put(value, value, context.timestamp()); + store.put(record.value(), record.value(), record.timestamp()); numRecordsProcessed++; - return new KeyValue<>(key, value); - } - - @Override - public void close() { + context.forward(record); } }, "store-name"); final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000L);