diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java index 5611fe99d24..4f7ca5e59ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.errors.ProcessorStateException; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.query.Position; import org.slf4j.Logger; @@ -81,7 +81,7 @@ abstract class AbstractSegments implements Segments { @Override public S getOrCreateSegmentIfLive(final long segmentId, - final ProcessorContext context, + final StateStoreContext context, final long streamTime) { final long minLiveTimestamp = streamTime - retentionPeriod; final long minLiveSegment = segmentId(minLiveTimestamp); @@ -95,7 +95,7 @@ abstract class AbstractSegments implements Segments { } @Override - public void openExisting(final ProcessorContext context, final long streamTime) { + public void openExisting(final StateStoreContext context, final long streamTime) { try { final File dir = new File(context.stateDir(), name); if (dir.exists()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java index 304d77e8259..a18d901b83f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; @@ -37,7 +37,7 @@ class KeyValueSegments extends AbstractSegments { @Override public KeyValueSegment getOrCreateSegment(final long segmentId, - final ProcessorContext context) { + final StateStoreContext context) { if (segments.containsKey(segmentId)) { return segments.get(segmentId); } else { @@ -55,7 +55,7 @@ class KeyValueSegments extends AbstractSegments { @Override public KeyValueSegment getOrCreateSegmentIfLive(final long segmentId, - final ProcessorContext context, + final StateStoreContext context, final long streamTime) { final KeyValueSegment segment = super.getOrCreateSegmentIfLive(segmentId, context, streamTime); cleanupExpiredSegments(streamTime); @@ -63,7 +63,7 @@ class KeyValueSegments extends AbstractSegments { } @Override - public void openExisting(final ProcessorContext context, final long streamTime) { + public void openExisting(final StateStoreContext context, final long streamTime) { metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); super.openExisting(context, streamTime); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java index bcbeb4689b3..c46a2c2788c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.query.Position; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; @@ -29,8 +29,8 @@ import java.util.Map; * Regular segments with {@code segmentId >= 0} expire according to the specified * retention period. "Reserved" segments with {@code segmentId < 0} do not expire * and are completely separate from regular segments in that methods such as - * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, ProcessorContext)}, - * {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)}, + * {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, StateStoreContext)}, + * {@link #getOrCreateSegmentIfLive(long, StateStoreContext, long)}, * {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)} * only return regular segments and not reserved segments. The methods {@link #flush()} * and {@link #close()} flush and close both regular and reserved segments, due to @@ -62,7 +62,7 @@ public class LogicalKeyValueSegments extends AbstractSegments internalProcessorContext; private Sensor expiredRecordSensor; private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; private boolean consistencyEnabled = false; @@ -489,7 +488,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore allSegments = restoreClient.reversedSegments(Long.MIN_VALUE); - if (allSegments.size() > 0) { + if (!allSegments.isEmpty()) { // collect entries into write batch for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) { final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment(); @@ -206,7 +206,7 @@ public class RocksDBVersionedStoreRestoreWriteBuffer { } @Override - public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime) { + public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final long segmentId, final StateStoreContext context, final long streamTime) { if (segmentsWriteBuffer.containsKey(segmentId)) { return segmentsWriteBuffer.get(segmentId); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index a3ef2426c3c..18086a5441b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import java.util.List; @@ -28,11 +28,11 @@ interface Segments { S segmentForTimestamp(final long timestamp); - S getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime); + S getOrCreateSegmentIfLive(final long segmentId, final StateStoreContext context, final long streamTime); - S getOrCreateSegment(final long segmentId, final ProcessorContext context); + S getOrCreateSegment(final long segmentId, final StateStoreContext context); - void openExisting(final ProcessorContext context, final long streamTime); + void openExisting(final StateStoreContext context, final long streamTime); List segments(final long timeFrom, final long timeTo, final boolean forward); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java index 597d7bf0ce0..70fae503060 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegments.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; @@ -37,7 +37,7 @@ class TimestampedSegments extends AbstractSegments { @Override public TimestampedSegment getOrCreateSegment(final long segmentId, - final ProcessorContext context) { + final StateStoreContext context) { if (segments.containsKey(segmentId)) { return segments.get(segmentId); } else { @@ -55,7 +55,7 @@ class TimestampedSegments extends AbstractSegments { @Override public TimestampedSegment getOrCreateSegmentIfLive(final long segmentId, - final ProcessorContext context, + final StateStoreContext context, final long streamTime) { final TimestampedSegment segment = super.getOrCreateSegmentIfLive(segmentId, context, streamTime); cleanupExpiredSegments(streamTime); @@ -63,7 +63,7 @@ class TimestampedSegments extends AbstractSegments { } @Override - public void openExisting(final ProcessorContext context, final long streamTime) { + public void openExisting(final StateStoreContext context, final long streamTime) { metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId()); super.openExisting(context, streamTime); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 0de1e1e606a..fdfed28ece2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -27,12 +27,13 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorTopology; @@ -62,7 +63,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; /** - * A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that + * A component that provides a {@link #context() StateStoreContext} that can be supplied to a {@link KeyValueStore} so that * all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes. * This class simplifies testing of various {@link KeyValueStore} instances, especially those that use * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic. @@ -110,7 +111,7 @@ import static org.mockito.Mockito.when; * *

Restoring a store

* This component can be used to test whether a {@link KeyValueStore} implementation properly - * {@link ProcessorContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that + * {@link StateStoreContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link StateStoreContext}, so that * the persisted contents of a store are properly restored from the flushed entries when the store instance is started. *

* To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be @@ -149,7 +150,7 @@ public class KeyValueStoreTestDriver { /** * Create a driver object that will have a {@link #context()} that records messages - * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and + * {@link ProcessorContext#forward(Record) forwarded} by the store and that provides default serializers and * deserializers for the given built-in key and value types (e.g., {@code String.class}, {@code Integer.class}, * {@code Long.class}, and {@code byte[].class}). This can be used when store is created to rely upon the * ProcessorContext's default key and value serializers and deserializers. @@ -167,14 +168,14 @@ public class KeyValueStoreTestDriver { /** * Create a driver object that will have a {@link #context()} that records messages - * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides the specified serializers and + * {@link ProcessorContext#forward(Record) forwarded} by the store and that provides the specified serializers and * deserializers. This can be used when store is created to rely upon the ProcessorContext's default key and value serializers * and deserializers. * - * @param keySerializer the key serializer for the {@link ProcessorContext}; may not be null - * @param keyDeserializer the key deserializer for the {@link ProcessorContext}; may not be null - * @param valueSerializer the value serializer for the {@link ProcessorContext}; may not be null - * @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null + * @param keySerializer the key serializer for the {@link StateStoreContext}; may not be null + * @param keyDeserializer the key deserializer for the {@link StateStoreContext}; may not be null + * @param valueSerializer the value serializer for the {@link StateStoreContext}; may not be null + * @param valueDeserializer the value deserializer for the {@link StateStoreContext}; may not be null * @return the test driver; never null */ public static KeyValueStoreTestDriver create(final Serializer keySerializer, @@ -195,6 +196,7 @@ public class KeyValueStoreTestDriver { private final InternalMockProcessorContext context; private final StateSerdes stateSerdes; + @SuppressWarnings("resource") private KeyValueStoreTestDriver(final StateSerdes serdes) { props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id"); @@ -264,7 +266,7 @@ public class KeyValueStoreTestDriver { stateDir.mkdirs(); stateSerdes = serdes; - context = new InternalMockProcessorContext(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) { + context = new InternalMockProcessorContext<>(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) { final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics()); @Override @@ -298,7 +300,7 @@ public class KeyValueStoreTestDriver { /** * Get the entries that are restored to a KeyValueStore when it is constructed with this driver's {@link #context() - * ProcessorContext}. + * StateStoreContext}. * * @return the restore entries; never null but possibly a null iterator */ @@ -345,7 +347,7 @@ public class KeyValueStoreTestDriver { * {@link #flushedEntryRemoved(Object)} methods. *

* If the {@link KeyValueStore}'s are to be restored upon its startup, be sure to {@link #addEntryToRestoreLog(Object, Object) - * add the restore entries} before creating the store with the {@link ProcessorContext} returned by this method. + * add the restore entries} before creating the store with the {@link StateStoreContext} returned by this method. * * @return the processing context; never null * @see #addEntryToRestoreLog(Object, Object) @@ -378,7 +380,7 @@ public class KeyValueStoreTestDriver { /** * Utility method to compute the number of entries within the store. * - * @param store the key value store using this {@link #context()}. + * @param store the key value store using this {@link #context() StateStoreContext}. * @return the number of entries */ public int sizeOf(final KeyValueStore store) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java index 119bda69c9f..e71704f32af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.query.Position; @@ -44,8 +43,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) @@ -68,11 +65,7 @@ public class KeyValueSegmentTest { final String directoryPath = TestUtils.tempDirectory().getAbsolutePath(); final File directory = new File(directoryPath); - final ProcessorContext mockContext = mock(ProcessorContext.class); - when(mockContext.appConfigs()).thenReturn(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO"))); - when(mockContext.stateDir()).thenReturn(directory); - - segment.openDB(mockContext.appConfigs(), mockContext.stateDir()); + segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")), directory); assertTrue(new File(directoryPath, "window").exists()); assertTrue(new File(directoryPath + File.separator + "window", "segment").exists()); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java index 82a76ba13a6..633d14c1e63 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.query.Position; @@ -44,8 +43,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.STRICT_STUBS) @@ -68,11 +65,7 @@ public class TimestampedSegmentTest { final String directoryPath = TestUtils.tempDirectory().getAbsolutePath(); final File directory = new File(directoryPath); - final ProcessorContext mockContext = mock(ProcessorContext.class); - when(mockContext.appConfigs()).thenReturn(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO"))); - when(mockContext.stateDir()).thenReturn(directory); - - segment.openDB(mockContext.appConfigs(), mockContext.stateDir()); + segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")), directory); assertTrue(new File(directoryPath, "window").exists()); assertTrue(new File(directoryPath + File.separator + "window", "segment").exists());