KAFKA-13480: Track Position in KeyValue stores (#11514)

Add position tracking to KeyValue stores in support of KIP-796

Reviewers: John Roesler <vvcephei@apache.org>
This commit is contained in:
Patrick Stuedi 2021-11-25 01:28:00 +01:00 committed by GitHub
parent 0f967828e1
commit 23e9818e62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 307 additions and 0 deletions

View File

@ -21,6 +21,9 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
@ -40,9 +43,12 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
private volatile boolean open = false;
private StateStoreContext context;
private Position position;
public InMemoryKeyValueStore(final String name) {
this.name = name;
this.position = Position.emptyPosition();
}
@Override
@ -62,6 +68,13 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
open = true;
}
@Override
public void init(final StateStoreContext context,
final StateStore root) {
init(StoreToProcessorContextAdapter.adapt(context), root);
this.context = context;
}
@Override
public boolean persistent() {
return false;
@ -72,6 +85,10 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
return open;
}
Position getPosition() {
return position;
}
@Override
public synchronized byte[] get(final Bytes key) {
return map.get(key);
@ -98,6 +115,11 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
} else {
map.put(key, value);
}
if (context != null && context.recordMetadata().isPresent()) {
final RecordMetadata meta = context.recordMetadata().get();
position = position.update(meta.topic(), meta.partition(), meta.offset());
}
}
@Override

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
public class Position {
private final ConcurrentMap<String, ConcurrentMap<Integer, AtomicLong>> position;
public static Position emptyPosition() {
final HashMap<String, Map<Integer, Long>> pos = new HashMap<>();
return new Position(pos);
}
public static Position fromMap(final Map<String, Map<Integer, Long>> map) {
return new Position(map);
}
private Position(final Map<String, Map<Integer, Long>> other) {
this.position = new ConcurrentHashMap<>();
merge(other, (t, e) -> update(t, e.getKey(), e.getValue().longValue()));
}
public Position update(final String topic, final int partition, final long offset) {
position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
final ConcurrentMap<Integer, AtomicLong> topicMap = position.get(topic);
topicMap.computeIfAbsent(partition, k -> new AtomicLong(0));
topicMap.get(partition).getAndAccumulate(offset, Math::max);
return this;
}
public void merge(final Position other) {
merge(other.position, (a, b) -> update(a, b.getKey(), b.getValue().longValue()));
}
@Override
public String toString() {
return "Position{" +
"position=" + position +
'}';
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Position other = (Position) o;
final HashMap<String, HashMap<Integer, Long>> position1 = new HashMap<>();
merge(position, (t, e) -> position1.computeIfAbsent(t, k -> new HashMap<Integer, Long>()).put(e.getKey(), e.getValue().longValue()));
final HashMap<String, HashMap<Integer, Long>> position2 = new HashMap<>();
merge(other.position, (t, e) -> position2.computeIfAbsent(t, k -> new HashMap<Integer, Long>()).put(e.getKey(), e.getValue().longValue()));
return Objects.equals(position1, position2);
}
@Override
public int hashCode() {
return Objects.hash(position);
}
private void merge(final Map<String, ? extends Map<Integer, ? extends Number>> other, final BiConsumer<String, Entry<Integer, ? extends Number>> func) {
for (final Entry<String, ? extends Map<Integer, ? extends Number>> entry : other.entrySet()) {
final String topic = entry.getKey();
final Map<Integer, ? extends Number> inputMap = entry.getValue();
for (final Entry<Integer, ? extends Number> topicEntry : inputMap.entrySet()) {
func.accept(topic, topicEntry);
}
}
}
}

View File

@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
@ -105,6 +106,8 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
private final RocksDBMetricsRecorder metricsRecorder;
protected volatile boolean open = false;
private StateStoreContext context;
private Position position;
RocksDBStore(final String name,
final String metricsScope) {
@ -117,6 +120,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
this.name = name;
this.parentDir = parentDir;
this.metricsRecorder = metricsRecorder;
this.position = Position.emptyPosition();
}
@SuppressWarnings("unchecked")
@ -186,6 +190,10 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
addValueProvidersToMetricsRecorder();
}
Position getPosition() {
return position;
}
private void maybeSetUpStatistics(final Map<String, Object> configs) {
if (userSpecifiedOptions.statistics() != null) {
userSpecifiedStatistics = true;
@ -252,6 +260,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
context.register(root, new RocksDBBatchingRestoreCallback(this));
this.context = context;
}
@Override
@ -281,6 +290,11 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
dbAccessor.put(key.get(), value);
if (context != null && context.recordMetadata().isPresent()) {
final RecordMetadata meta = context.recordMetadata().get();
position = position.update(meta.topic(), meta.partition(), meta.offset());
}
}
@Override

View File

@ -47,6 +47,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
private KeyValueStore<Bytes, byte[]> byteStore;
private final Serializer<String> stringSerializer = new StringSerializer();
private final KeyValueStoreTestDriver<Bytes, byte[]> byteStoreDriver = KeyValueStoreTestDriver.create(Bytes.class, byte[].class);
private InMemoryKeyValueStore inMemoryKeyValueStore;
@Before
public void createStringKeyValueStore() {
@ -58,6 +59,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
new Serdes.ByteArraySerde());
byteStore = storeBuilder.build();
byteStore.init(byteStoreContext, byteStore);
this.inMemoryKeyValueStore = getInMemoryStore();
}
@After
@ -80,6 +82,10 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
return store;
}
InMemoryKeyValueStore getInMemoryStore() {
return new InMemoryKeyValueStore("in-memory-store-test");
}
@SuppressWarnings("unchecked")
@Test
public void shouldRemoveKeysWithNullValues() {
@ -237,4 +243,33 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
public void shouldThrowNullPointerIfPrefixKeySerializerIsNull() {
assertThrows(NullPointerException.class, () -> byteStore.prefixScan("bb", null));
}
@Test
public void shouldMatchPositionAfterPut() {
final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "1")),
stringSerializer.serialize(null, "a")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "2")),
stringSerializer.serialize(null, "b")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "3")),
stringSerializer.serialize(null, "c")));
final MonotonicProcessorRecordContext recordContext = new MonotonicProcessorRecordContext("input", 0);
context.setRecordContext(recordContext);
inMemoryKeyValueStore.init((StateStoreContext) context, inMemoryKeyValueStore);
final Position expected = Position.emptyPosition();
long offset = 0;
for (final KeyValue<Bytes, byte[]> k : entries) {
inMemoryKeyValueStore.put(k.key, k.value);
expected.update("input", 0, offset);
offset++;
}
final Position actual = inMemoryKeyValueStore.getPosition();
assertThat(expected, is(actual));
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
public class MonotonicProcessorRecordContext extends ProcessorRecordContext {
private long counter;
public MonotonicProcessorRecordContext(final String topic, final int partition) {
super(0, 0, partition, topic, new RecordHeaders());
this.counter = 0;
}
@Override
public long offset() {
final long ret = counter;
counter++;
return ret;
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import java.io.IOException;
import org.junit.Test;
public class PositionTest {
private final String topic = "topic";
@Test
public void shouldMatchOnEqual() throws IOException {
final Position position1 = Position.emptyPosition();
final Position position2 = Position.emptyPosition();
position1.update("topic1", 0, 1);
position2.update("topic1", 0, 1);
position1.update("topic1", 1, 2);
position2.update("topic1", 1, 2);
position1.update("topic1", 2, 1);
position2.update("topic1", 2, 1);
position1.update("topic2", 0, 0);
position2.update("topic2", 0, 0);
assertEquals(position1, position2);
}
@Test
public void shouldNotMatchOnUnEqual() throws IOException {
final Position position1 = Position.emptyPosition();
final Position position2 = Position.emptyPosition();
position1.update("topic1", 0, 1);
position2.update("topic1", 0, 1);
position1.update("topic1", 1, 2);
position1.update("topic1", 2, 1);
position2.update("topic1", 2, 1);
position1.update("topic2", 0, 0);
position2.update("topic2", 0, 0);
assertNotEquals(position1, position2);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
import java.util.Optional;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.MetricConfig;
@ -110,6 +111,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
InternalMockProcessorContext context;
RocksDBStore rocksDBStore;
Position position;
@Before
public void setUp() {
@ -123,6 +125,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
new StreamsConfig(props)
);
rocksDBStore = getRocksDBStore();
position = rocksDBStore.getPosition();
}
@After
@ -383,6 +386,35 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
}
@Test
public void shouldMatchPositionAfterPut() {
final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "1")),
stringSerializer.serialize(null, "a")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "2")),
stringSerializer.serialize(null, "b")));
entries.add(new KeyValue<>(
new Bytes(stringSerializer.serialize(null, "3")),
stringSerializer.serialize(null, "c")));
final MonotonicProcessorRecordContext recordContext = new MonotonicProcessorRecordContext("input", 0);
context.setRecordContext(recordContext);
rocksDBStore.init((StateStoreContext) context, rocksDBStore);
final Position expected = Position.emptyPosition();
long offset = 0;
for (final KeyValue<Bytes, byte[]> k : entries) {
rocksDBStore.put(k.key, k.value);
expected.update("input", 0, offset);
offset++;
}
final Position actual = rocksDBStore.getPosition();
assertEquals(expected, actual);
}
@Test
public void shouldReturnKeysWithGivenPrefix() {
final List<KeyValue<Bytes, byte[]>> entries = new ArrayList<>();
@ -786,6 +818,8 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
EasyMock.expect(context.appConfigs())
.andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
EasyMock.expect(context.stateDir()).andStubReturn(dir);
final MonotonicProcessorRecordContext processorRecordContext = new MonotonicProcessorRecordContext("test", 0);
EasyMock.expect(context.recordMetadata()).andStubReturn(Optional.of(processorRecordContext));
EasyMock.replay(context);
rocksDBStore.init((StateStoreContext) context, rocksDBStore);
@ -818,6 +852,8 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
EasyMock.expect(context.appConfigs())
.andStubReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
EasyMock.expect(context.stateDir()).andStubReturn(dir);
final MonotonicProcessorRecordContext processorRecordContext = new MonotonicProcessorRecordContext("test", 0);
EasyMock.expect(context.recordMetadata()).andStubReturn(Optional.of(processorRecordContext));
EasyMock.replay(context);
rocksDBStore.init((StateStoreContext) context, rocksDBStore);
@ -981,4 +1017,5 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
return result;
}
}