From 23e9818e625976c22fe6d4297a5ab76b01f92ef6 Mon Sep 17 00:00:00 2001 From: Patrick Stuedi Date: Thu, 25 Nov 2021 01:28:00 +0100 Subject: [PATCH] KAFKA-13480: Track Position in KeyValue stores (#11514) Add position tracking to KeyValue stores in support of KIP-796 Reviewers: John Roesler --- .../internals/InMemoryKeyValueStore.java | 22 +++++ .../streams/state/internals/Position.java | 96 +++++++++++++++++++ .../streams/state/internals/RocksDBStore.java | 14 +++ .../internals/InMemoryKeyValueStoreTest.java | 35 +++++++ .../MonotonicProcessorRecordContext.java | 37 +++++++ .../streams/state/internals/PositionTest.java | 66 +++++++++++++ .../state/internals/RocksDBStoreTest.java | 37 +++++++ 7 files changed, 307 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index f0c6dbedc58..cd6d29ff88d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -21,6 +21,9 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.slf4j.Logger; @@ -40,9 +43,12 @@ public class InMemoryKeyValueStore implements KeyValueStore { private final String name; private final NavigableMap map = new TreeMap<>(); private volatile boolean open = false; + private StateStoreContext context; + private Position position; public InMemoryKeyValueStore(final String name) { this.name = name; + this.position = Position.emptyPosition(); } @Override @@ -62,6 +68,13 @@ public class InMemoryKeyValueStore implements KeyValueStore { open = true; } + @Override + public void init(final StateStoreContext context, + final StateStore root) { + init(StoreToProcessorContextAdapter.adapt(context), root); + this.context = context; + } + @Override public boolean persistent() { return false; @@ -72,6 +85,10 @@ public class InMemoryKeyValueStore implements KeyValueStore { return open; } + Position getPosition() { + return position; + } + @Override public synchronized byte[] get(final Bytes key) { return map.get(key); @@ -98,6 +115,11 @@ public class InMemoryKeyValueStore implements KeyValueStore { } else { map.put(key, value); } + + if (context != null && context.recordMetadata().isPresent()) { + final RecordMetadata meta = context.recordMetadata().get(); + position = position.update(meta.topic(), meta.partition(), meta.offset()); + } } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java new file mode 100644 index 00000000000..dfb1c61caa7 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +public class Position { + private final ConcurrentMap> position; + + public static Position emptyPosition() { + final HashMap> pos = new HashMap<>(); + return new Position(pos); + } + + public static Position fromMap(final Map> map) { + return new Position(map); + } + + private Position(final Map> other) { + this.position = new ConcurrentHashMap<>(); + merge(other, (t, e) -> update(t, e.getKey(), e.getValue().longValue())); + } + + public Position update(final String topic, final int partition, final long offset) { + position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>()); + final ConcurrentMap topicMap = position.get(topic); + topicMap.computeIfAbsent(partition, k -> new AtomicLong(0)); + topicMap.get(partition).getAndAccumulate(offset, Math::max); + return this; + } + + public void merge(final Position other) { + merge(other.position, (a, b) -> update(a, b.getKey(), b.getValue().longValue())); + } + + @Override + public String toString() { + return "Position{" + + "position=" + position + + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Position other = (Position) o; + final HashMap> position1 = new HashMap<>(); + merge(position, (t, e) -> position1.computeIfAbsent(t, k -> new HashMap()).put(e.getKey(), e.getValue().longValue())); + final HashMap> position2 = new HashMap<>(); + merge(other.position, (t, e) -> position2.computeIfAbsent(t, k -> new HashMap()).put(e.getKey(), e.getValue().longValue())); + + return Objects.equals(position1, position2); + } + + @Override + public int hashCode() { + return Objects.hash(position); + } + + private void merge(final Map> other, final BiConsumer> func) { + for (final Entry> entry : other.entrySet()) { + final String topic = entry.getKey(); + final Map inputMap = entry.getValue(); + for (final Entry topicEntry : inputMap.entrySet()) { + func.accept(topic, topicEntry); + } + } + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index aa1b1ba09f0..93dd93bd761 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.BatchingStateRestoreCallback; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.RocksDBConfigSetter; @@ -105,6 +106,8 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS private final RocksDBMetricsRecorder metricsRecorder; protected volatile boolean open = false; + private StateStoreContext context; + private Position position; RocksDBStore(final String name, final String metricsScope) { @@ -117,6 +120,7 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS this.name = name; this.parentDir = parentDir; this.metricsRecorder = metricsRecorder; + this.position = Position.emptyPosition(); } @SuppressWarnings("unchecked") @@ -186,6 +190,10 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS addValueProvidersToMetricsRecorder(); } + Position getPosition() { + return position; + } + private void maybeSetUpStatistics(final Map configs) { if (userSpecifiedOptions.statistics() != null) { userSpecifiedStatistics = true; @@ -252,6 +260,7 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS // value getter should always read directly from rocksDB // since it is only for values that are already flushed context.register(root, new RocksDBBatchingRestoreCallback(this)); + this.context = context; } @Override @@ -281,6 +290,11 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS Objects.requireNonNull(key, "key cannot be null"); validateStoreOpen(); dbAccessor.put(key.get(), value); + + if (context != null && context.recordMetadata().isPresent()) { + final RecordMetadata meta = context.recordMetadata().get(); + position = position.update(meta.topic(), meta.partition(), meta.offset()); + } } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java index d67d665134a..dc162d4cece 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java @@ -47,6 +47,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { private KeyValueStore byteStore; private final Serializer stringSerializer = new StringSerializer(); private final KeyValueStoreTestDriver byteStoreDriver = KeyValueStoreTestDriver.create(Bytes.class, byte[].class); + private InMemoryKeyValueStore inMemoryKeyValueStore; @Before public void createStringKeyValueStore() { @@ -58,6 +59,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { new Serdes.ByteArraySerde()); byteStore = storeBuilder.build(); byteStore.init(byteStoreContext, byteStore); + this.inMemoryKeyValueStore = getInMemoryStore(); } @After @@ -80,6 +82,10 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { return store; } + InMemoryKeyValueStore getInMemoryStore() { + return new InMemoryKeyValueStore("in-memory-store-test"); + } + @SuppressWarnings("unchecked") @Test public void shouldRemoveKeysWithNullValues() { @@ -237,4 +243,33 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest { public void shouldThrowNullPointerIfPrefixKeySerializerIsNull() { assertThrows(NullPointerException.class, () -> byteStore.prefixScan("bb", null)); } + + @Test + public void shouldMatchPositionAfterPut() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "1")), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "2")), + stringSerializer.serialize(null, "b"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "3")), + stringSerializer.serialize(null, "c"))); + + final MonotonicProcessorRecordContext recordContext = new MonotonicProcessorRecordContext("input", 0); + context.setRecordContext(recordContext); + inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore); + + final Position expected = Position.emptyPosition(); + long offset = 0; + for (final KeyValue k : entries) { + inMemoryKeyValueStore.put(k.key, k.value); + expected.update("input", 0, offset); + offset++; + } + + final Position actual = inMemoryKeyValueStore.getPosition(); + assertThat(expected, is(actual)); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java new file mode 100644 index 00000000000..27cbdd19397 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MonotonicProcessorRecordContext.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; + +public class MonotonicProcessorRecordContext extends ProcessorRecordContext { + private long counter; + + public MonotonicProcessorRecordContext(final String topic, final int partition) { + super(0, 0, partition, topic, new RecordHeaders()); + this.counter = 0; + } + + @Override + public long offset() { + final long ret = counter; + counter++; + return ret; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java new file mode 100644 index 00000000000..e5126099069 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import org.junit.Test; + +public class PositionTest { + + private final String topic = "topic"; + + @Test + public void shouldMatchOnEqual() throws IOException { + final Position position1 = Position.emptyPosition(); + final Position position2 = Position.emptyPosition(); + position1.update("topic1", 0, 1); + position2.update("topic1", 0, 1); + + position1.update("topic1", 1, 2); + position2.update("topic1", 1, 2); + + position1.update("topic1", 2, 1); + position2.update("topic1", 2, 1); + + position1.update("topic2", 0, 0); + position2.update("topic2", 0, 0); + + assertEquals(position1, position2); + } + + @Test + public void shouldNotMatchOnUnEqual() throws IOException { + final Position position1 = Position.emptyPosition(); + final Position position2 = Position.emptyPosition(); + position1.update("topic1", 0, 1); + position2.update("topic1", 0, 1); + + position1.update("topic1", 1, 2); + + position1.update("topic1", 2, 1); + position2.update("topic1", 2, 1); + + position1.update("topic2", 0, 0); + position2.update("topic2", 0, 0); + + assertNotEquals(position1, position2); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java index 066f080c764..28090503f95 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import java.util.Optional; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.MetricConfig; @@ -110,6 +111,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { InternalMockProcessorContext context; RocksDBStore rocksDBStore; + Position position; @Before public void setUp() { @@ -123,6 +125,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { new StreamsConfig(props) ); rocksDBStore = getRocksDBStore(); + position = rocksDBStore.getPosition(); } @After @@ -383,6 +386,35 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3"))))); } + @Test + public void shouldMatchPositionAfterPut() { + final List> entries = new ArrayList<>(); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "1")), + stringSerializer.serialize(null, "a"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "2")), + stringSerializer.serialize(null, "b"))); + entries.add(new KeyValue<>( + new Bytes(stringSerializer.serialize(null, "3")), + stringSerializer.serialize(null, "c"))); + + final MonotonicProcessorRecordContext recordContext = new MonotonicProcessorRecordContext("input", 0); + context.setRecordContext(recordContext); + rocksDBStore.init((StateStoreContext) context, rocksDBStore); + + final Position expected = Position.emptyPosition(); + long offset = 0; + for (final KeyValue k : entries) { + rocksDBStore.put(k.key, k.value); + expected.update("input", 0, offset); + offset++; + } + + final Position actual = rocksDBStore.getPosition(); + assertEquals(expected, actual); + } + @Test public void shouldReturnKeysWithGivenPrefix() { final List> entries = new ArrayList<>(); @@ -786,6 +818,8 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { EasyMock.expect(context.appConfigs()) .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals()); EasyMock.expect(context.stateDir()).andStubReturn(dir); + final MonotonicProcessorRecordContext processorRecordContext = new MonotonicProcessorRecordContext("test", 0); + EasyMock.expect(context.recordMetadata()).andStubReturn(Optional.of(processorRecordContext)); EasyMock.replay(context); rocksDBStore.init((StateStoreContext) context, rocksDBStore); @@ -818,6 +852,8 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { EasyMock.expect(context.appConfigs()) .andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals()); EasyMock.expect(context.stateDir()).andStubReturn(dir); + final MonotonicProcessorRecordContext processorRecordContext = new MonotonicProcessorRecordContext("test", 0); + EasyMock.expect(context.recordMetadata()).andStubReturn(Optional.of(processorRecordContext)); EasyMock.replay(context); rocksDBStore.init((StateStoreContext) context, rocksDBStore); @@ -981,4 +1017,5 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest { return result; } + }