KAFKA-13722: Refactor Kafka Streams store interfaces (#18243)

Refactor Segments and TimestampedSegments to not use old
ProcessorContext any longer.

Reviewers: Bruno Cadonna <bruno@confluent.io>
This commit is contained in:
Matthias J. Sax 2024-12-19 11:55:57 -08:00 committed by GitHub
parent fe9847dd4a
commit b3b40bb77b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 43 additions and 56 deletions

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.query.Position;
import org.slf4j.Logger;
@ -81,7 +81,7 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
@Override
public S getOrCreateSegmentIfLive(final long segmentId,
final ProcessorContext context,
final StateStoreContext context,
final long streamTime) {
final long minLiveTimestamp = streamTime - retentionPeriod;
final long minLiveSegment = segmentId(minLiveTimestamp);
@ -95,7 +95,7 @@ abstract class AbstractSegments<S extends Segment> implements Segments<S> {
}
@Override
public void openExisting(final ProcessorContext context, final long streamTime) {
public void openExisting(final StateStoreContext context, final long streamTime) {
try {
final File dir = new File(context.stateDir(), name);
if (dir.exists()) {

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@ -37,7 +37,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
@Override
public KeyValueSegment getOrCreateSegment(final long segmentId,
final ProcessorContext context) {
final StateStoreContext context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
@ -55,7 +55,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
@Override
public KeyValueSegment getOrCreateSegmentIfLive(final long segmentId,
final ProcessorContext context,
final StateStoreContext context,
final long streamTime) {
final KeyValueSegment segment = super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
cleanupExpiredSegments(streamTime);
@ -63,7 +63,7 @@ class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
}
@Override
public void openExisting(final ProcessorContext context, final long streamTime) {
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
super.openExisting(context, streamTime);
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@ -29,8 +29,8 @@ import java.util.Map;
* Regular segments with {@code segmentId >= 0} expire according to the specified
* retention period. "Reserved" segments with {@code segmentId < 0} do not expire
* and are completely separate from regular segments in that methods such as
* {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, ProcessorContext)},
* {@link #getOrCreateSegmentIfLive(long, ProcessorContext, long)},
* {@link #segmentForTimestamp(long)}, {@link #getOrCreateSegment(long, StateStoreContext)},
* {@link #getOrCreateSegmentIfLive(long, StateStoreContext, long)},
* {@link #segments(long, long, boolean)}, and {@link #allSegments(boolean)}
* only return regular segments and not reserved segments. The methods {@link #flush()}
* and {@link #close()} flush and close both regular and reserved segments, due to
@ -62,7 +62,7 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
@Override
public LogicalKeyValueSegment getOrCreateSegment(final long segmentId,
final ProcessorContext context) {
final StateStoreContext context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
@ -103,7 +103,7 @@ public class LogicalKeyValueSegments extends AbstractSegments<LogicalKeyValueSeg
}
@Override
public void openExisting(final ProcessorContext context, final long streamTime) {
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
physicalStore.openDB(context.appConfigs(), context.stateDir());
}

View File

@ -23,7 +23,6 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
@ -102,7 +101,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
private final RocksDBVersionedStoreClient versionedStoreClient;
private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
private InternalProcessorContext internalProcessorContext;
private InternalProcessorContext<?, ?> internalProcessorContext;
private Sensor expiredRecordSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
private boolean consistencyEnabled = false;
@ -489,7 +488,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
/**
* @return the segment with the provided id, or {@code null} if the segment is expired
*/
T getOrCreateSegmentIfLive(long segmentId, ProcessorContext context, long streamTime);
T getOrCreateSegmentIfLive(long segmentId, StateStoreContext context, long streamTime);
/**
* @return all segments in the store which contain timestamps at least the provided
@ -525,7 +524,7 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte
}
@Override
public LogicalKeyValueSegment getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime) {
public LogicalKeyValueSegment getOrCreateSegmentIfLive(final long segmentId, final StateStoreContext context, final long streamTime) {
return segmentStores.getOrCreateSegmentIfLive(segmentId, context, streamTime);
}

View File

@ -18,7 +18,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.RocksDBVersionedStoreClient;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
@ -91,7 +91,7 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
// older segments/stores before later ones
try (final WriteBatch segmentsBatch = new WriteBatch()) {
final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.reversedSegments(Long.MIN_VALUE);
if (allSegments.size() > 0) {
if (!allSegments.isEmpty()) {
// collect entries into write batch
for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) {
final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment();
@ -206,7 +206,7 @@ public class RocksDBVersionedStoreRestoreWriteBuffer {
}
@Override
public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime) {
public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final long segmentId, final StateStoreContext context, final long streamTime) {
if (segmentsWriteBuffer.containsKey(segmentId)) {
return segmentsWriteBuffer.get(segmentId);
}

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import java.util.List;
@ -28,11 +28,11 @@ interface Segments<S extends Segment> {
S segmentForTimestamp(final long timestamp);
S getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime);
S getOrCreateSegmentIfLive(final long segmentId, final StateStoreContext context, final long streamTime);
S getOrCreateSegment(final long segmentId, final ProcessorContext context);
S getOrCreateSegment(final long segmentId, final StateStoreContext context);
void openExisting(final ProcessorContext context, final long streamTime);
void openExisting(final StateStoreContext context, final long streamTime);
List<S> segments(final long timeFrom, final long timeTo, final boolean forward);

View File

@ -16,7 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@ -37,7 +37,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
@Override
public TimestampedSegment getOrCreateSegment(final long segmentId,
final ProcessorContext context) {
final StateStoreContext context) {
if (segments.containsKey(segmentId)) {
return segments.get(segmentId);
} else {
@ -55,7 +55,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
@Override
public TimestampedSegment getOrCreateSegmentIfLive(final long segmentId,
final ProcessorContext context,
final StateStoreContext context,
final long streamTime) {
final TimestampedSegment segment = super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
cleanupExpiredSegments(streamTime);
@ -63,7 +63,7 @@ class TimestampedSegments extends AbstractSegments<TimestampedSegment> {
}
@Override
public void openExisting(final ProcessorContext context, final long streamTime) {
public void openExisting(final StateStoreContext context, final long streamTime) {
metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), context.taskId());
super.openExisting(context, streamTime);
}

View File

@ -27,12 +27,13 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@ -62,7 +63,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that
* A component that provides a {@link #context() StateStoreContext} that can be supplied to a {@link KeyValueStore} so that
* all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes.
* This class simplifies testing of various {@link KeyValueStore} instances, especially those that use
* {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
@ -110,7 +111,7 @@ import static org.mockito.Mockito.when;
*
* <h2>Restoring a store</h2>
* This component can be used to test whether a {@link KeyValueStore} implementation properly
* {@link ProcessorContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that
* {@link StateStoreContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link StateStoreContext}, so that
* the persisted contents of a store are properly restored from the flushed entries when the store instance is started.
* <p>
* To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be
@ -149,7 +150,7 @@ public class KeyValueStoreTestDriver<K, V> {
/**
* Create a driver object that will have a {@link #context()} that records messages
* {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides default serializers and
* {@link ProcessorContext#forward(Record) forwarded} by the store and that provides default serializers and
* deserializers for the given built-in key and value types (e.g., {@code String.class}, {@code Integer.class},
* {@code Long.class}, and {@code byte[].class}). This can be used when store is created to rely upon the
* ProcessorContext's default key and value serializers and deserializers.
@ -167,14 +168,14 @@ public class KeyValueStoreTestDriver<K, V> {
/**
* Create a driver object that will have a {@link #context()} that records messages
* {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides the specified serializers and
* {@link ProcessorContext#forward(Record) forwarded} by the store and that provides the specified serializers and
* deserializers. This can be used when store is created to rely upon the ProcessorContext's default key and value serializers
* and deserializers.
*
* @param keySerializer the key serializer for the {@link ProcessorContext}; may not be null
* @param keyDeserializer the key deserializer for the {@link ProcessorContext}; may not be null
* @param valueSerializer the value serializer for the {@link ProcessorContext}; may not be null
* @param valueDeserializer the value deserializer for the {@link ProcessorContext}; may not be null
* @param keySerializer the key serializer for the {@link StateStoreContext}; may not be null
* @param keyDeserializer the key deserializer for the {@link StateStoreContext}; may not be null
* @param valueSerializer the value serializer for the {@link StateStoreContext}; may not be null
* @param valueDeserializer the value deserializer for the {@link StateStoreContext}; may not be null
* @return the test driver; never null
*/
public static <K, V> KeyValueStoreTestDriver<K, V> create(final Serializer<K> keySerializer,
@ -195,6 +196,7 @@ public class KeyValueStoreTestDriver<K, V> {
private final InternalMockProcessorContext<?, ?> context;
private final StateSerdes<K, V> stateSerdes;
@SuppressWarnings("resource")
private KeyValueStoreTestDriver(final StateSerdes<K, V> serdes) {
props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "application-id");
@ -264,7 +266,7 @@ public class KeyValueStoreTestDriver<K, V> {
stateDir.mkdirs();
stateSerdes = serdes;
context = new InternalMockProcessorContext<Object, Object>(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
context = new InternalMockProcessorContext<>(stateDir, serdes.keySerde(), serdes.valueSerde(), recordCollector, null) {
final ThreadCache cache = new ThreadCache(new LogContext("testCache "), 1024 * 1024L, metrics());
@Override
@ -298,7 +300,7 @@ public class KeyValueStoreTestDriver<K, V> {
/**
* Get the entries that are restored to a KeyValueStore when it is constructed with this driver's {@link #context()
* ProcessorContext}.
* StateStoreContext}.
*
* @return the restore entries; never null but possibly a null iterator
*/
@ -345,7 +347,7 @@ public class KeyValueStoreTestDriver<K, V> {
* {@link #flushedEntryRemoved(Object)} methods.
* <p>
* If the {@link KeyValueStore}'s are to be restored upon its startup, be sure to {@link #addEntryToRestoreLog(Object, Object)
* add the restore entries} before creating the store with the {@link ProcessorContext} returned by this method.
* add the restore entries} before creating the store with the {@link StateStoreContext} returned by this method.
*
* @return the processing context; never null
* @see #addEntryToRestoreLog(Object, Object)
@ -378,7 +380,7 @@ public class KeyValueStoreTestDriver<K, V> {
/**
* Utility method to compute the number of entries within the store.
*
* @param store the key value store using this {@link #context()}.
* @param store the key value store using this {@link #context() StateStoreContext}.
* @return the number of entries
*/
public int sizeOf(final KeyValueStore<K, V> store) {

View File

@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
@ -44,8 +43,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@ -68,11 +65,7 @@ public class KeyValueSegmentTest {
final String directoryPath = TestUtils.tempDirectory().getAbsolutePath();
final File directory = new File(directoryPath);
final ProcessorContext mockContext = mock(ProcessorContext.class);
when(mockContext.appConfigs()).thenReturn(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")));
when(mockContext.stateDir()).thenReturn(directory);
segment.openDB(mockContext.appConfigs(), mockContext.stateDir());
segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")), directory);
assertTrue(new File(directoryPath, "window").exists());
assertTrue(new File(directoryPath + File.separator + "window", "segment").exists());

View File

@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
@ -44,8 +43,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@ -68,11 +65,7 @@ public class TimestampedSegmentTest {
final String directoryPath = TestUtils.tempDirectory().getAbsolutePath();
final File directory = new File(directoryPath);
final ProcessorContext mockContext = mock(ProcessorContext.class);
when(mockContext.appConfigs()).thenReturn(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")));
when(mockContext.stateDir()).thenReturn(directory);
segment.openDB(mockContext.appConfigs(), mockContext.stateDir());
segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")), directory);
assertTrue(new File(directoryPath, "window").exists());
assertTrue(new File(directoryPath + File.separator + "window", "segment").exists());