KAFKA-13498: track position in remaining state stores (#11541)

Reviewers: Vicky Papavasileiou <vpapavasileiou@confluent.io>, John Roesler<vvcephei@apache.org>
This commit is contained in:
Patrick Stuedi 2021-12-01 18:49:10 +01:00 committed by GitHub
parent 1bf418beaf
commit 62f73c30d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 269 additions and 6 deletions

View File

@ -54,6 +54,7 @@ public class CachingKeyValueStore
private InternalProcessorContext<?, ?> context;
private Thread streamThread;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private Position position;
CachingKeyValueStore(final KeyValueStore<Bytes, byte[]> underlying) {
super(underlying);
@ -80,8 +81,13 @@ public class CachingKeyValueStore
streamThread = Thread.currentThread();
}
Position getPosition() {
return position;
}
private void initInternal(final InternalProcessorContext<?, ?> context) {
this.context = context;
this.position = Position.emptyPosition();
this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), name());
this.context.registerCacheFlushListener(cacheName, entries -> {
@ -158,6 +164,8 @@ public class CachingKeyValueStore
context.timestamp(),
context.partition(),
context.topic()));
position = position.update(context.topic(), context.partition(), context.offset());
}
@Override

View File

@ -24,7 +24,10 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -65,12 +68,16 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
private volatile boolean open = false;
private StateStoreContext stateStoreContext;
private Position position;
InMemorySessionStore(final String name,
final long retentionPeriod,
final String metricScope) {
this.name = name;
this.retentionPeriod = retentionPeriod;
this.metricScope = metricScope;
this.position = Position.emptyPosition();
}
@Override
@ -105,6 +112,17 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
open = true;
}
@Override
public void init(final StateStoreContext context,
final StateStore root) {
init(StoreToProcessorContextAdapter.adapt(context), root);
this.stateStoreContext = context;
}
Position getPosition() {
return position;
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
removeExpiredSegments();
@ -129,6 +147,11 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
remove(sessionKey);
}
}
if (stateStoreContext != null && stateStoreContext.recordMetadata().isPresent()) {
final RecordMetadata meta = stateStoreContext.recordMetadata().get();
position = position.update(meta.topic(), meta.partition(), meta.offset());
}
}
@Override

View File

@ -24,7 +24,10 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -68,6 +71,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
private volatile boolean open = false;
private Position position;
private StateStoreContext stateStoreContext;
public InMemoryWindowStore(final String name,
final long retentionPeriod,
final long windowSize,
@ -78,6 +84,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
this.windowSize = windowSize;
this.retainDuplicates = retainDuplicates;
this.metricScope = metricScope;
this.position = Position.emptyPosition();
}
@Override
@ -106,6 +113,17 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
open = true;
}
@Override
public void init(final StateStoreContext context,
final StateStore root) {
init(StoreToProcessorContextAdapter.adapt(context), root);
this.stateStoreContext = context;
}
Position getPosition() {
return position;
}
@Override
public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
removeExpiredSegments();
@ -131,6 +149,11 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
});
}
}
if (stateStoreContext != null && stateStoreContext.recordMetadata().isPresent()) {
final RecordMetadata meta = stateStoreContext.recordMetadata().get();
position = position.update(meta.topic(), meta.partition(), meta.offset());
}
}
@Override

View File

@ -44,10 +44,12 @@ public class Position {
}
public Position update(final String topic, final int partition, final long offset) {
position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
final ConcurrentMap<Integer, AtomicLong> topicMap = position.get(topic);
topicMap.computeIfAbsent(partition, k -> new AtomicLong(0));
topicMap.get(partition).getAndAccumulate(offset, Math::max);
if (topic != null) {
position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
final ConcurrentMap<Integer, AtomicLong> topicMap = position.get(topic);
topicMap.computeIfAbsent(partition, k -> new AtomicLong(0));
topicMap.get(partition).getAndAccumulate(offset, Math::max);
}
return this;
}

View File

@ -18,6 +18,9 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
@ -26,8 +29,22 @@ public class RocksDBSessionStore
extends WrappedStateStore<SegmentedBytesStore, Object, Object>
implements SessionStore<Bytes, byte[]> {
private Position position;
private StateStoreContext stateStoreContext;
RocksDBSessionStore(final SegmentedBytesStore bytesStore) {
super(bytesStore);
this.position = Position.emptyPosition();
}
@Override
public void init(final StateStoreContext context, final StateStore root) {
super.init(context, root);
this.stateStoreContext = context;
}
Position getPosition() {
return position;
}
@Override
@ -121,5 +138,10 @@ public class RocksDBSessionStore
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
wrapped().put(SessionKeySchema.toBinary(sessionKey), aggregate);
if (stateStoreContext != null && stateStoreContext.recordMetadata().isPresent()) {
final RecordMetadata meta = stateStoreContext.recordMetadata().get();
position = position.update(meta.topic(), meta.partition(), meta.offset());
}
}
}

View File

@ -18,6 +18,9 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@ -31,12 +34,26 @@ public class RocksDBWindowStore
private int seqnum = 0;
private Position position;
private StateStoreContext stateStoreContext;
RocksDBWindowStore(final SegmentedBytesStore bytesStore,
final boolean retainDuplicates,
final long windowSize) {
super(bytesStore);
this.retainDuplicates = retainDuplicates;
this.windowSize = windowSize;
this.position = Position.emptyPosition();
}
@Override
public void init(final StateStoreContext context, final StateStore root) {
super.init(context, root);
this.stateStoreContext = context;
}
Position getPosition() {
return position;
}
@Override
@ -45,6 +62,11 @@ public class RocksDBWindowStore
if (!(value == null && retainDuplicates)) {
maybeUpdateSeqnumForDups();
wrapped().put(WindowKeySchema.toStoreKeyBinary(key, windowStartTimestamp, seqnum), value);
if (stateStoreContext != null && stateStoreContext.recordMetadata().isPresent()) {
final RecordMetadata meta = stateStoreContext.recordMetadata().get();
position = position.update(meta.topic(), meta.partition(), meta.offset());
}
}
}
@ -112,4 +134,6 @@ public class RocksDBWindowStore
seqnum = (seqnum + 1) & 0x7FFFFFFF;
}
}
}

View File

@ -77,7 +77,7 @@ public abstract class AbstractSessionBytesStoreTest {
private MockRecordCollector recordCollector;
private InternalMockProcessorContext context;
InternalMockProcessorContext context;
abstract <K, V> SessionStore<K, V> buildSessionStore(final long retentionPeriod,
final Serde<K> keySerde,

View File

@ -217,6 +217,29 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
assertEquals(0, underlyingStore.approximateNumEntries());
}
@Test
public void shouldMatchPositionAfterPut() {
final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
entries.add(new KeyValue<>(bytesKey("key1"), bytesValue("value1")));
entries.add(new KeyValue<>(bytesKey("key2"), bytesValue("value2")));
entries.add(new KeyValue<>(bytesKey("key3"), bytesValue("value3")));
entries.add(new KeyValue<>(bytesKey("key4"), bytesValue("value4")));
entries.add(new KeyValue<>(bytesKey("key5"), bytesValue("value5")));
final MonotonicProcessorRecordContext recordContext = new MonotonicProcessorRecordContext("input", 0, false);
context.setRecordContext(recordContext);
final Position expected = Position.emptyPosition();
for (final KeyValue<Bytes, byte[]> k : entries) {
store.put(k.key, k.value);
expected.update(recordContext.topic(), recordContext.partition(), recordContext.offset());
recordContext.kick();
}
final Position actual = store.getPosition();
assertEquals(expected, actual);
}
private byte[] bytesValue(final String value) {
return value.getBytes();
}

View File

@ -16,7 +16,10 @@
*/
package org.apache.kafka.streams.state.internals;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -29,6 +32,8 @@ import java.util.HashSet;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -83,4 +88,31 @@ public class InMemorySessionStoreTest extends AbstractSessionBytesStoreTest {
assertFalse(sessionStore.findSessions("a", "b", 0L, 20L).hasNext());
}
@Test
public void shouldMatchPositionAfterPut() {
final List<KeyValue<Windowed<String>, Long>> entries = new ArrayList<>();
entries.add(new KeyValue<>(new Windowed<String>("a", new SessionWindow(0, 0)), 1L));
entries.add(new KeyValue<>(new Windowed<String>("aa", new SessionWindow(0, 10)), 2L));
entries.add(new KeyValue<>(new Windowed<String>("a", new SessionWindow(10, 20)), 3L));
final MonotonicProcessorRecordContext recordContext = new MonotonicProcessorRecordContext("input", 0);
context.setRecordContext(recordContext);
final Position expected = Position.emptyPosition();
long offset = 0;
for (final KeyValue<Windowed<String>, Long> k : entries) {
sessionStore.put(k.key, k.value);
expected.update("input", 0, offset);
offset++;
}
final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore;
final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
final InMemorySessionStore inMemorySessionStore = (InMemorySessionStore) changeLoggingSessionBytesStore.wrapped();
final Position actual = inMemorySessionStore.getPosition();
assertThat(expected, is(actual));
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
import java.util.ArrayList;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
@ -32,6 +33,8 @@ import java.util.List;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.toStoreKeyBinary;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -160,4 +163,32 @@ public class InMemoryWindowStoreTest extends AbstractWindowBytesStoreTest {
assertFalse(iterator.hasNext());
}
@Test
public void shouldMatchPositionAfterPut() {
final List<KeyValue<Integer, String>> entries = new ArrayList<>();
entries.add(new KeyValue<>(0, "v"));
entries.add(new KeyValue<>(1, "v"));
entries.add(new KeyValue<>(2, "v"));
entries.add(new KeyValue<>(3, "v"));
entries.add(new KeyValue<>(4, "v"));
final MonotonicProcessorRecordContext recordContext = new MonotonicProcessorRecordContext("input", 0);
context.setRecordContext(recordContext);
final Position expected = Position.emptyPosition();
long offset = 0;
for (final KeyValue<Integer, String> k : entries) {
windowStore.put(k.key, k.value, SEGMENT_INTERVAL);
expected.update("input", 0, offset);
offset++;
}
final MeteredWindowStore<Integer, String> meteredSessionStore = (MeteredWindowStore<Integer, String>) windowStore;
final ChangeLoggingWindowBytesStore changeLoggingSessionBytesStore = (ChangeLoggingWindowBytesStore) meteredSessionStore.wrapped();
final InMemoryWindowStore inMemoryWindowStore = (InMemoryWindowStore) changeLoggingSessionBytesStore.wrapped();
final Position actual = inMemoryWindowStore.getPosition();
assertThat(expected, is(actual));
}
}

View File

@ -22,16 +22,30 @@ import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
public class MonotonicProcessorRecordContext extends ProcessorRecordContext {
private long counter;
private final boolean automatic;
public MonotonicProcessorRecordContext(final String topic, final int partition) {
this(topic, partition, true);
}
public MonotonicProcessorRecordContext(final String topic, final int partition, final boolean automatic) {
super(0, 0, partition, topic, new RecordHeaders());
this.counter = 0;
this.automatic = automatic;
}
@Override
public long offset() {
final long ret = counter;
counter++;
if (automatic) {
counter++;
}
return ret;
}
public void kick() {
if (!automatic) {
counter++;
}
}
}

View File

@ -16,7 +16,10 @@
*/
package org.apache.kafka.streams.state.internals;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -31,6 +34,8 @@ import static java.time.Duration.ofMillis;
import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
import static org.junit.Assert.assertEquals;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
@ -63,4 +68,30 @@ public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
assertEquals(valuesToSet(iterator), new HashSet<>(Arrays.asList(2L, 3L, 4L)));
}
}
@Test
public void shouldMatchPositionAfterPut() {
final List<KeyValue<Windowed<String>, Long>> entries = new ArrayList<>();
entries.add(new KeyValue<>(new Windowed<String>("a", new SessionWindow(0, 0)), 1L));
entries.add(new KeyValue<>(new Windowed<String>("aa", new SessionWindow(0, SEGMENT_INTERVAL)), 2L));
entries.add(new KeyValue<>(new Windowed<String>("a", new SessionWindow(10, SEGMENT_INTERVAL)), 3L));
final MonotonicProcessorRecordContext recordContext = new MonotonicProcessorRecordContext("input", 0);
context.setRecordContext(recordContext);
final Position expected = Position.emptyPosition();
long offset = 0;
for (final KeyValue<Windowed<String>, Long> k : entries) {
sessionStore.put(k.key, k.value);
expected.update("input", 0, offset);
offset++;
}
final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore;
final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
final RocksDBSessionStore rocksDBSessionStore = (RocksDBSessionStore) changeLoggingSessionBytesStore.wrapped();
final Position actual = rocksDBSessionStore.getPosition();
assertThat(expected, is(actual));
}
}

View File

@ -40,6 +40,8 @@ import static java.time.Instant.ofEpochMilli;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static org.apache.kafka.test.StreamsTestUtils.valuesToSet;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -637,6 +639,34 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
);
}
@Test
public void shouldMatchPositionAfterPut() {
final List<KeyValue<Integer, String>> entries = new ArrayList<>();
entries.add(new KeyValue<>(0, "v"));
entries.add(new KeyValue<>(1, "v"));
entries.add(new KeyValue<>(2, "v"));
entries.add(new KeyValue<>(3, "v"));
entries.add(new KeyValue<>(4, "v"));
final MonotonicProcessorRecordContext recordContext = new MonotonicProcessorRecordContext("input", 0);
context.setRecordContext(recordContext);
final Position expected = Position.emptyPosition();
long offset = 0;
for (final KeyValue<Integer, String> k : entries) {
windowStore.put(k.key, k.value, SEGMENT_INTERVAL);
expected.update("input", 0, offset);
offset++;
}
final MeteredWindowStore<Integer, String> meteredSessionStore = (MeteredWindowStore<Integer, String>) windowStore;
final ChangeLoggingWindowBytesStore changeLoggingSessionBytesStore = (ChangeLoggingWindowBytesStore) meteredSessionStore.wrapped();
final RocksDBWindowStore rocksDBWindowStore = (RocksDBWindowStore) changeLoggingSessionBytesStore.wrapped();
final Position actual = rocksDBWindowStore.getPosition();
assertThat(expected, is(actual));
}
private Set<String> segmentDirs(final File baseDir) {
final File windowDir = new File(baseDir, windowStore.name());