KAFKA-3060: Refactor MeteredStore and RockDBStore Impl

Changes include:

1) Move logging logic from MeteredXXXStore to internal stores, and leave WindowedStore API clean by removed all internalPut/Get functions.

2) Wrap common logging behavior of InMemory and LRUCache stores into one class.

3) Fix a bug for StoreChangeLogger where byte arrays are not comparable in HashSet by using a specified RawStoreChangeLogger.

4) Add a caching layer on top of RocksDBStore with object caching, it relies on the object's equals and hashCode function to be consistent with serdes.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Yasuhiro Matsuda <yasuhiro@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #826 from guozhangwang/K3060
This commit is contained in:
Guozhang Wang 2016-02-01 16:11:13 -08:00 committed by Ewen Cheslack-Postava
parent 66ecf3f08d
commit 57da044a99
36 changed files with 1223 additions and 550 deletions

View File

@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Count;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
@ -33,6 +32,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
@ -121,7 +121,7 @@ public class PageViewTypedJob {
// write to the result topic
regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>());
KafkaStreams kstream = new KafkaStreams(builder, props);
kstream.start();
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}

View File

@ -53,8 +53,6 @@ public class PageViewUnTypedJob {
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
StreamsConfig config = new StreamsConfig(props);
KStreamBuilder builder = new KStreamBuilder();
final Serializer<String> stringSerializer = new StringSerializer();
@ -101,7 +99,7 @@ public class PageViewUnTypedJob {
// write to the result topic
regionCount.to("streams-pageviewstats-output");
KafkaStreams kstream = new KafkaStreams(builder, config);
kstream.start();
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}

View File

@ -44,7 +44,7 @@ public class PipeJob {
builder.stream("streams-file-input").to("streams-pipe-output");
KafkaStreams kstream = new KafkaStreams(builder, props);
kstream.start();
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}

View File

@ -97,7 +97,7 @@ public class WordCountJob {
counts.to("streams-wordcount-output", stringSerializer, JsonSerializer);
KafkaStreams kstream = new KafkaStreams(builder, props);
kstream.start();
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}

View File

@ -157,6 +157,7 @@ public class StreamsConfig extends AbstractConfig {
ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
.define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
WALLCLOCK_TIMESTAMP_EXTRACTOR,
Importance.MEDIUM,
TIMESTAMP_EXTRACTOR_CLASS_DOC)
.define(PARTITION_GROUPER_CLASS_CONFIG,

View File

@ -36,9 +36,9 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes;
import org.apache.kafka.streams.state.Stores;
import java.lang.reflect.Array;
import java.util.HashSet;
@ -304,23 +304,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
RocksDBWindowStoreSupplier<K, V> thisWindow =
new RocksDBWindowStoreSupplier<>(
windows.name() + "-this",
windows.maintainMs(),
windows.segments,
true,
new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer),
null);
StateStoreSupplier thisWindow = Stores.create(windows.name() + "-this")
.withKeys(keySerializer, keyDeserializer)
.withValues(otherValueSerializer, otherValueDeserializer)
.persistent()
.windowed(windows.maintainMs(), windows.segments, true)
.build();
RocksDBWindowStoreSupplier<K, V1> otherWindow =
new RocksDBWindowStoreSupplier<>(
windows.name() + "-other",
windows.maintainMs(),
windows.segments,
true,
new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
null);
StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
.withKeys(keySerializer, keyDeserializer)
.withValues(otherValueSerializer, otherValueDeserializer)
.persistent()
.windowed(windows.maintainMs(), windows.segments, true)
.build();
KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
@ -360,14 +356,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
RocksDBWindowStoreSupplier<K, V1> otherWindow =
new RocksDBWindowStoreSupplier<>(
windows.name() + "-this",
windows.maintainMs(),
windows.segments,
true,
new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
null);
StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
.withKeys(keySerializer, keyDeserializer)
.withValues(otherValueSerializer, otherValueDeserializer)
.persistent()
.windowed(windows.maintainMs(), windows.segments, true)
.build();
KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
@ -410,14 +404,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamReduce<>(windows, windows.name(), reducer);
RocksDBWindowStoreSupplier<K, V> aggregateStore =
new RocksDBWindowStoreSupplier<>(
windows.name(),
windows.maintainMs(),
windows.segments,
false,
new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
null);
StateStoreSupplier aggregateStore = Stores.create(windows.name())
.withKeys(keySerializer, keyDeserializer)
.withValues(aggValueSerializer, aggValueDeserializer)
.persistent()
.windowed(windows.maintainMs(), windows.segments, false)
.build();
// aggregate the values with the aggregator and local store
topology.addProcessor(selectName, aggWindowSupplier, this.name);
@ -444,14 +436,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregator);
RocksDBWindowStoreSupplier<K, T> aggregateStore =
new RocksDBWindowStoreSupplier<>(
windows.name(),
windows.maintainMs(),
windows.segments,
false,
new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
null);
StateStoreSupplier aggregateStore = Stores.create(windows.name())
.withKeys(keySerializer, keyDeserializer)
.withValues(aggValueSerializer, aggValueDeserializer)
.persistent()
.windowed(windows.maintainMs(), windows.segments, false)
.build();
// aggregate the values with the aggregator and local store
topology.addProcessor(selectName, aggWindowSupplier, this.name);

View File

@ -273,7 +273,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
StateStoreSupplier aggregateStore = Stores.create(name)
.withKeys(keySerializer, keyDeserializer)
.withValues(aggValueSerializer, aggValueDeserializer)
.localDatabase()
.persistent()
.build();
// select the aggregate key and values (old and new), it would require parent to send old values
@ -322,7 +322,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
StateStoreSupplier aggregateStore = Stores.create(name)
.withKeys(keySerializer, keyDeserializer)
.withValues(valueSerializer, valueDeserializer)
.localDatabase()
.persistent()
.build();
// select the aggregate key and values (old and new), it would require parent to send old values

View File

@ -52,7 +52,7 @@ public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), serdes, "rocksdb-state", time).disableLogging();
return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), "rocksdb-state", time);
}
}

View File

@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
/**
* Factory for creating key-value stores.
@ -76,10 +77,27 @@ public class Stores {
}
@Override
public LocalDatabaseKeyValueFactory<K, V> localDatabase() {
return new LocalDatabaseKeyValueFactory<K, V>() {
public PersistentKeyValueFactory<K, V> persistent() {
return new PersistentKeyValueFactory<K, V>() {
private int numSegments = 0;
private long retentionPeriod = 0L;
private boolean retainDuplicates = false;
@Override
public PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int numSegments, boolean retainDuplicates) {
this.numSegments = numSegments;
this.retentionPeriod = retentionPeriod;
this.retainDuplicates = retainDuplicates;
return this;
}
@Override
public StateStoreSupplier build() {
if (numSegments > 0) {
return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, serdes, null);
}
return new RocksDBKeyValueStoreSupplier<>(name, serdes, null);
}
};
@ -237,7 +255,7 @@ public class Stores {
*
* @return the factory to create in-memory key-value stores; never null
*/
LocalDatabaseKeyValueFactory<K, V> localDatabase();
PersistentKeyValueFactory<K, V> persistent();
}
/**
@ -270,7 +288,17 @@ public class Stores {
* @param <K> the type of keys
* @param <V> the type of values
*/
public static interface LocalDatabaseKeyValueFactory<K, V> {
public static interface PersistentKeyValueFactory<K, V> {
/**
* Set the persistent store as a windowed key-value store
*
* @param retentionPeriod the maximum period of time in milli-second to keep each window in this store
* @param numSegments the maximum number of segments for rolling the windowed store
* @param retainDuplicates whether or not to retain duplicate data within the window
*/
PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int numSegments, boolean retainDuplicates);
/**
* Return the instance of StateStoreSupplier of new key-value store.
* @return the key-value store; never null

View File

@ -27,12 +27,5 @@ public interface WindowStore<K, V> extends StateStore {
void put(K key, V value, long timestamp);
byte[] putAndReturnInternalKey(K key, V value, long timestamp);
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
void putInternal(byte[] binaryKey, byte[] binaryValue);
byte[] getInternal(byte[] binaryKey);
}

View File

@ -21,7 +21,7 @@ package org.apache.kafka.streams.state;
import java.nio.ByteBuffer;
public class WindowStoreUtil<K, V> {
public class WindowStoreUtils<K, V> {
public static final int TIMESTAMP_SIZE = 8;
public static final int SEQNUM_SIZE = 4;
@ -51,5 +51,4 @@ public class WindowStoreUtil<K, V> {
public static long timestampFromBinaryKey(byte[] binaryKey) {
return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
}
}

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
import java.util.List;
public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
private final KeyValueStore<K, V> inner;
private final Serdes<K, V> serdes;
private final String topic;
private StoreChangeLogger<K, V> changeLogger;
private StoreChangeLogger.ValueGetter<K, V> getter;
public InMemoryKeyValueLoggedStore(final String topic, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) {
this.topic = topic;
this.inner = inner;
this.serdes = serdes;
}
@Override
public String name() {
return this.topic;
}
@Override
public void init(ProcessorContext context) {
this.changeLogger = new StoreChangeLogger<>(topic, context, serdes);
inner.init(context);
this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
@Override
public V get(K key) {
return inner.get(key);
}
};
}
@Override
public boolean persistent() {
return inner.persistent();
}
@Override
public V get(K key) {
return this.inner.get(key);
}
@Override
public void put(K key, V value) {
this.inner.put(key, value);
changeLogger.add(key);
changeLogger.maybeLogChange(this.getter);
}
@Override
public void putAll(List<KeyValue<K, V>> entries) {
this.inner.putAll(entries);
for (KeyValue<K, V> entry : entries) {
K key = entry.key;
changeLogger.add(key);
}
changeLogger.maybeLogChange(this.getter);
}
@Override
public V delete(K key) {
V value = this.inner.delete(key);
removed(key);
return value;
}
/**
* Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
* store.
*
* @param key the key for the entry that the inner store removed
*/
protected void removed(K key) {
changeLogger.delete(key);
changeLogger.maybeLogChange(this.getter);
}
@Override
public KeyValueIterator<K, V> range(K from, K to) {
return this.inner.range(from, to);
}
@Override
public KeyValueIterator<K, V> all() {
return this.inner.all();
}
@Override
public void close() {
inner.close();
}
@Override
public void flush() {
this.inner.flush();
changeLogger.logChange(getter);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
@ -43,13 +44,13 @@ import java.util.TreeMap;
public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
private final String name;
private final Serdes serdes;
private final Time time;
private final Serdes<K, V> serdes;
public InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
this.name = name;
this.serdes = serdes;
this.time = time;
this.serdes = serdes;
}
public String name() {
@ -57,7 +58,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
return new MeteredKeyValueStore<>(new MemoryStore<K, V>(name).enableLogging(serdes), "in-memory-state", time);
}
private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
@ -65,12 +66,22 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
private final String name;
private final NavigableMap<K, V> map;
private boolean loggingEnabled = false;
private Serdes<K, V> serdes = null;
public MemoryStore(String name) {
super();
this.name = name;
this.map = new TreeMap<>();
}
public KeyValueStore<K, V> enableLogging(Serdes<K, V> serdes) {
this.loggingEnabled = true;
this.serdes = serdes;
return new InMemoryKeyValueLoggedStore<>(this.name, this, serdes);
}
@Override
public String name() {
return this.name;
@ -78,7 +89,16 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
@Override
public void init(ProcessorContext context) {
// do-nothing since it is in-memory
if (loggingEnabled) {
context.register(this, true, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
put(serdes.keyFrom(key), serdes.valueFrom(value));
}
});
}
}
@Override

View File

@ -17,21 +17,10 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
/**
* An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
*
@ -43,7 +32,7 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
private final String name;
private final int capacity;
private final Serdes serdes;
private final Serdes<K, V> serdes;
private final Time time;
public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
@ -57,143 +46,17 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
return name;
}
@SuppressWarnings("unchecked")
public StateStore get() {
MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
final MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<K, V>(name, capacity);
final InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore) cache.enableLogging(serdes);
final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time);
cache.whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
@Override
public void apply(K key, V value) {
store.removed(key);
loggedCache.removed(key);
}
});
return store;
}
private static interface EldestEntryRemovalListener<K, V> {
public void apply(K key, V value);
}
protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
private final String name;
private final Map<K, V> map;
private final NavigableSet<K> keys;
private EldestEntryRemovalListener<K, V> listener;
public MemoryLRUCache(String name, final int maxCacheSize) {
this.name = name;
this.keys = new TreeSet<>();
// leave room for one extra entry to handle adding an entry before the oldest can be removed
this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
if (size() > maxCacheSize) {
K key = eldest.getKey();
keys.remove(key);
if (listener != null) listener.apply(key, eldest.getValue());
return true;
}
return false;
}
};
}
protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
this.listener = listener;
}
@Override
public String name() {
return this.name;
}
@Override
public void init(ProcessorContext context) {
// do-nothing since it is in-memory
}
@Override
public boolean persistent() {
return false;
}
@Override
public V get(K key) {
return this.map.get(key);
}
@Override
public void put(K key, V value) {
this.map.put(key, value);
this.keys.add(key);
}
@Override
public void putAll(List<KeyValue<K, V>> entries) {
for (KeyValue<K, V> entry : entries)
put(entry.key, entry.value);
}
@Override
public V delete(K key) {
V value = this.map.remove(key);
this.keys.remove(key);
return value;
}
@Override
public KeyValueIterator<K, V> range(K from, K to) {
return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
}
@Override
public KeyValueIterator<K, V> all() {
return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
}
@Override
public void flush() {
// do-nothing since it is in-memory
}
@Override
public void close() {
// do-nothing
}
private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
private final Iterator<K> keys;
private final Map<K, V> entries;
private K lastKey;
public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
this.keys = keys;
this.entries = entries;
}
@Override
public boolean hasNext() {
return keys.hasNext();
}
@Override
public KeyValue<K, V> next() {
lastKey = keys.next();
return new KeyValue<>(lastKey, entries.get(lastKey));
}
@Override
public void remove() {
keys.remove();
entries.remove(lastKey);
}
@Override
public void close() {
// do nothing
}
}
}
}

View File

@ -0,0 +1,151 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
public interface EldestEntryRemovalListener<K, V> {
void apply(K key, V value);
}
protected String name;
protected Map<K, V> map;
protected Set<K> keys;
protected EldestEntryRemovalListener<K, V> listener;
private boolean loggingEnabled = false;
private Serdes<K, V> serdes = null;
// this is used for extended MemoryNavigableLRUCache only
public MemoryLRUCache() {}
public MemoryLRUCache(String name, final int maxCacheSize) {
this.name = name;
this.keys = new HashSet<>();
// leave room for one extra entry to handle adding an entry before the oldest can be removed
this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
if (size() > maxCacheSize) {
K key = eldest.getKey();
keys.remove(key);
if (listener != null) listener.apply(key, eldest.getValue());
return true;
}
return false;
}
};
}
public KeyValueStore<K, V> enableLogging(Serdes<K, V> serdes) {
this.loggingEnabled = true;
this.serdes = serdes;
return new InMemoryKeyValueLoggedStore<>(this.name, this, serdes);
}
public MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
this.listener = listener;
return this;
}
@Override
public String name() {
return this.name;
}
@Override
public void init(ProcessorContext context) {
if (loggingEnabled) {
context.register(this, true, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
put(serdes.keyFrom(key), serdes.valueFrom(value));
}
});
}
}
@Override
public boolean persistent() {
return false;
}
@Override
public V get(K key) {
return this.map.get(key);
}
@Override
public void put(K key, V value) {
this.map.put(key, value);
this.keys.add(key);
}
@Override
public void putAll(List<KeyValue<K, V>> entries) {
for (KeyValue<K, V> entry : entries)
put(entry.key, entry.value);
}
@Override
public V delete(K key) {
V value = this.map.remove(key);
this.keys.remove(key);
return value;
}
@Override
public KeyValueIterator<K, V> range(K from, K to) {
throw new UnsupportedOperationException("MemoryLRUCache does not support range() function.");
}
@Override
public KeyValueIterator<K, V> all() {
throw new UnsupportedOperationException("MemoryLRUCache does not support all() function.");
}
@Override
public void flush() {
// do-nothing since it is in-memory
}
@Override
public void close() {
// do-nothing since it is in-memory
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
public MemoryNavigableLRUCache(String name, final int maxCacheSize) {
super();
this.name = name;
this.keys = new TreeSet<>();
// leave room for one extra entry to handle adding an entry before the oldest can be removed
this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
if (size() > maxCacheSize) {
K key = eldest.getKey();
keys.remove(key);
if (listener != null) listener.apply(key, eldest.getValue());
return true;
}
return false;
}
};
}
@Override
public MemoryNavigableLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
this.listener = listener;
return this;
}
@SuppressWarnings("unchecked")
@Override
public KeyValueIterator<K, V> range(K from, K to) {
return new MemoryNavigableLRUCache.CacheIterator<K, V>(((NavigableSet) this.keys).subSet(from, true, to, false).iterator(), this.map);
}
@Override
public KeyValueIterator<K, V> all() {
return new MemoryNavigableLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
}
private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
private final Iterator<K> keys;
private final Map<K, V> entries;
private K lastKey;
public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
this.keys = keys;
this.entries = entries;
}
@Override
public boolean hasNext() {
return keys.hasNext();
}
@Override
public KeyValue<K, V> next() {
lastKey = keys.next();
return new KeyValue<>(lastKey, entries.get(lastKey));
}
@Override
public void remove() {
keys.remove();
entries.remove(lastKey);
}
@Override
public void close() {
// do nothing
}
}
}

View File

@ -17,25 +17,27 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
import java.util.List;
/**
* Metered KeyValueStore wrapper is used for recording operation metrics, and hence its
* inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
*
* @param <K>
* @param <V>
*/
public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected final KeyValueStore<K, V> inner;
protected final StoreChangeLogger.ValueGetter getter;
protected final Serdes<K, V> serialization;
protected final String metricScope;
protected final Time time;
@ -49,27 +51,13 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
private Sensor restoreTime;
private StreamsMetrics metrics;
private boolean loggingEnabled = true;
private StoreChangeLogger<K, V> changeLogger = null;
// always wrap the store with the metered store
public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
public MeteredKeyValueStore(final KeyValueStore<K, V> inner, String metricScope, Time time) {
this.inner = inner;
this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
public V get(K key) {
return inner.get(key);
}
};
this.serialization = serialization;
this.metricScope = metricScope;
this.time = time != null ? time : new SystemTime();
}
public MeteredKeyValueStore<K, V> disableLogging() {
loggingEnabled = false;
return this;
}
@Override
public String name() {
return inner.name();
@ -88,22 +76,10 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
// register and possibly restore the state from the logs
long startNs = time.nanoseconds();
inner.init(context);
try {
final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
final Deserializer<V> valDeserializer = serialization.valueDeserializer();
context.register(this, loggingEnabled, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
inner.put(keyDeserializer.deserialize(name, key),
valDeserializer.deserialize(name, value));
}
});
inner.init(context);
} finally {
this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
}
@ -129,11 +105,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
long startNs = time.nanoseconds();
try {
this.inner.put(key, value);
if (loggingEnabled) {
changeLogger.add(key);
changeLogger.maybeLogChange(this.getter);
}
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
@ -144,14 +115,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
long startNs = time.nanoseconds();
try {
this.inner.putAll(entries);
if (loggingEnabled) {
for (KeyValue<K, V> entry : entries) {
K key = entry.key;
changeLogger.add(key);
}
changeLogger.maybeLogChange(this.getter);
}
} finally {
this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
}
@ -163,27 +126,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
try {
V value = this.inner.delete(key);
removed(key);
return value;
} finally {
this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
}
}
/**
* Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
* store.
*
* @param key the key for the entry that the inner store removed
*/
protected void removed(K key) {
if (loggingEnabled) {
changeLogger.delete(key);
changeLogger.maybeLogChange(this.getter);
}
}
@Override
public KeyValueIterator<K, V> range(K from, K to) {
return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
@ -204,9 +152,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
long startNs = time.nanoseconds();
try {
this.inner.flush();
if (loggingEnabled)
changeLogger.logChange(this.getter);
} finally {
this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
}
@ -247,7 +192,5 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
}
}
}
}

View File

@ -23,45 +23,28 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.state.Serdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
protected final WindowStore<K, V> inner;
protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
protected final String metricScope;
protected final Time time;
private Sensor putTime;
private Sensor getTime;
private Sensor rangeTime;
private Sensor fetchTime;
private Sensor flushTime;
private Sensor restoreTime;
private StreamsMetrics metrics;
private boolean loggingEnabled = true;
private StoreChangeLogger<byte[], byte[]> changeLogger = null;
// always wrap the store with the metered store
public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
this.inner = inner;
this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
public byte[] get(byte[] key) {
return inner.getInternal(key);
}
};
this.metricScope = metricScope;
this.time = time != null ? time : new SystemTime();
}
public MeteredWindowStore<K, V> disableLogging() {
loggingEnabled = false;
return this;
}
@Override
public String name() {
return inner.name();
@ -72,24 +55,14 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
final String name = name();
this.metrics = context.metrics();
this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
this.fetchTime = this.metrics.addLatencySensor(metricScope, name, "fetch");
this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
this.changeLogger = this.loggingEnabled ?
new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
// register and possibly restore the state from the logs
long startNs = time.nanoseconds();
inner.init(context);
try {
context.register(this, loggingEnabled, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
inner.putInternal(key, value);
}
});
inner.init(context);
} finally {
this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
}
@ -102,48 +75,26 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
@Override
public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.fetchTime);
}
@Override
public void put(K key, V value) {
putAndReturnInternalKey(key, value, -1L);
}
@Override
public void put(K key, V value, long timestamp) {
putAndReturnInternalKey(key, value, timestamp);
}
@Override
public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
long startNs = time.nanoseconds();
try {
byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
if (loggingEnabled) {
changeLogger.add(binKey);
changeLogger.maybeLogChange(this.getter);
}
return binKey;
this.inner.put(key, value);
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
}
@Override
public void putInternal(byte[] binaryKey, byte[] binaryValue) {
inner.putInternal(binaryKey, binaryValue);
}
@Override
public byte[] getInternal(byte[] binaryKey) {
public void put(K key, V value, long timestamp) {
long startNs = time.nanoseconds();
try {
return this.inner.getInternal(binaryKey);
this.inner.put(key, value, timestamp);
} finally {
this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
}
@ -157,9 +108,6 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
long startNs = time.nanoseconds();
try {
this.inner.flush();
if (loggingEnabled)
changeLogger.logChange(this.getter);
} finally {
this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
}

View File

@ -100,7 +100,7 @@ public class OffsetCheckpoint {
public Map<TopicPartition, Long> read() throws IOException {
synchronized (lock) {
BufferedReader reader = null;
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader(file));
} catch (FileNotFoundException e) {
@ -136,8 +136,7 @@ public class OffsetCheckpoint {
throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
}
} finally {
if (reader != null)
reader.close();
reader.close();
}
}
}
@ -146,8 +145,7 @@ public class OffsetCheckpoint {
String line = reader.readLine();
if (line == null)
throw new EOFException("File ended prematurely.");
int val = Integer.parseInt(line);
return val;
return Integer.parseInt(line);
}
public void delete() throws IOException {

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.WindowStoreUtils;
import java.util.Comparator;
import java.util.TreeSet;
public class RawStoreChangeLogger extends StoreChangeLogger<byte[], byte[]> {
private class ByteArrayComparator implements Comparator<byte[]> {
@Override
public int compare(byte[] left, byte[] right) {
for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
int a = left[i] & 0xff;
int b = right[j] & 0xff;
if (a != b)
return a - b;
}
return left.length - right.length;
}
}
public RawStoreChangeLogger(String topic, ProcessorContext context) {
this(topic, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
}
public RawStoreChangeLogger(String topic, ProcessorContext context, int maxDirty, int maxRemoved) {
super(topic, context, context.id().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved);
init();
}
@Override
public void init() {
this.dirty = new TreeSet<>(new ByteArrayComparator());
this.removed = new TreeSet<>(new ByteArrayComparator());
}
}

View File

@ -33,7 +33,7 @@ import org.apache.kafka.streams.state.Serdes;
public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
private final String name;
private final Serdes serdes;
private final Serdes<K, V> serdes;
private final Time time;
public RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
@ -47,7 +47,6 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes).enableLogging(), "rocksdb-state", time);
}
}

View File

@ -20,9 +20,11 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
@ -31,18 +33,23 @@ import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private static final int TTL_NOT_USED = -1;
// TODO: these values should be configurable
private static final int DEFAULT_UNENCODED_CACHE_SIZE = 1000;
private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
@ -58,11 +65,31 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private final WriteOptions wOptions;
private final FlushOptions fOptions;
private Serdes<K, V> serdes;
private ProcessorContext context;
private Serdes<K, V> serdes;
protected File dbDir;
private RocksDB db;
private boolean loggingEnabled = false;
private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE;
private Set<K> cacheDirtyKeys;
private MemoryLRUCache<K, RocksDBCacheEntry> cache;
private StoreChangeLogger<byte[], byte[]> changeLogger;
private StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
public KeyValueStore<K, V> enableLogging() {
loggingEnabled = true;
return this;
}
public RocksDBStore<K, V> withCacheSize(int cacheSize) {
this.cacheSize = cacheSize;
return this;
}
public RocksDBStore(String name, Serdes<K, V> serdes) {
this.name = name;
this.serdes = serdes;
@ -88,10 +115,63 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
fOptions.setWaitForFlush(true);
}
private class RocksDBCacheEntry {
public V value;
public boolean isDirty;
public RocksDBCacheEntry(V value) {
this(value, false);
}
public RocksDBCacheEntry(V value, boolean isDirty) {
this.value = value;
this.isDirty = isDirty;
}
}
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;
if (this.cacheSize > 0) {
this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize)
.whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<K, RocksDBCacheEntry>() {
@Override
public void apply(K key, RocksDBCacheEntry entry) {
// flush all the dirty entries to RocksDB if this evicted entry is dirty
if (entry.isDirty) {
flush();
}
}
});
this.cacheDirtyKeys = new HashSet<>();
} else {
this.cache = null;
this.cacheDirtyKeys = null;
}
// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
@Override
public byte[] get(byte[] key) {
return getInternal(key);
}
};
context.register(this, loggingEnabled, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
putInternal(key, value);
}
});
}
private RocksDB openDB(File dir, Options options, int ttl) {
@ -100,12 +180,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
dir.getParentFile().mkdirs();
return RocksDB.open(options, dir.toString());
} else {
throw new ProcessorStateException("Change log is not supported for store " + this.name + " since it is TTL based.");
throw new UnsupportedOperationException("Change log is not supported for store " + this.name + " since it is TTL based.");
// TODO: support TTL with change log?
// return TtlDB.open(options, dir.toString(), ttl, false);
}
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e);
}
}
@ -122,25 +201,64 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public V get(K key) {
if (cache != null) {
RocksDBCacheEntry entry = cache.get(key);
if (entry == null) {
byte[] rawKey = serdes.rawKey(key);
V value = serdes.valueFrom(getInternal(serdes.rawKey(key)));
cache.put(key, new RocksDBCacheEntry(value));
return value;
} else {
return entry.value;
}
} else {
return serdes.valueFrom(getInternal(serdes.rawKey(key)));
}
}
private byte[] getInternal(byte[] rawKey) {
try {
return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
return this.db.get(rawKey);
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
throw new ProcessorStateException("Error while executing get " + key.toString() + " from store " + this.name, e);
throw new ProcessorStateException("Error while getting value for key " + serdes.keyFrom(rawKey) +
" from store " + this.name, e);
}
}
@Override
public void put(K key, V value) {
try {
if (value == null) {
db.remove(wOptions, serdes.rawKey(key));
} else {
db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
if (cache != null) {
cache.put(key, new RocksDBCacheEntry(value, true));
cacheDirtyKeys.add(key);
} else {
byte[] rawKey = serdes.rawKey(key);
byte[] rawValue = serdes.rawValue(value);
putInternal(rawKey, rawValue);
}
}
private void putInternal(byte[] rawKey, byte[] rawValue) {
if (rawValue == null) {
try {
db.remove(wOptions, rawKey);
} catch (RocksDBException e) {
throw new ProcessorStateException("Error while removing key " + serdes.keyFrom(rawKey) +
" from store " + this.name, e);
}
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
throw new ProcessorStateException("Error while executing put " + key.toString() + " from store " + this.name, e);
} else {
try {
db.put(wOptions, rawKey, rawValue);
} catch (RocksDBException e) {
throw new ProcessorStateException("Error while executing put key " + serdes.keyFrom(rawKey) +
" and value " + serdes.keyFrom(rawValue) + " from store " + this.name, e);
}
}
if (loggingEnabled) {
changeLogger.add(rawKey);
changeLogger.maybeLogChange(this.getter);
}
}
@ -150,6 +268,21 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
put(entry.key, entry.value);
}
// this function is only called in flush()
private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) {
WriteBatch batch = new WriteBatch();
for (KeyValue<byte[], byte[]> entry : entries) {
batch.put(entry.key, entry.value);
}
try {
db.write(wOptions, batch);
} catch (RocksDBException e) {
throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
}
}
@Override
public V delete(K key) {
V value = get(key);
@ -159,11 +292,19 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public KeyValueIterator<K, V> range(K from, K to) {
// we need to flush the cache if necessary before returning the iterator
if (cache != null)
flush();
return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
}
@Override
public KeyValueIterator<K, V> all() {
// we need to flush the cache if necessary before returning the iterator
if (cache != null)
flush();
RocksIterator innerIter = db.newIterator();
innerIter.seekToFirst();
return new RocksDbIterator<K, V>(innerIter, serdes);
@ -171,10 +312,60 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public void flush() {
// flush of the cache entries if necessary
if (cache != null) {
List<KeyValue<byte[], byte[]>> putBatch = new ArrayList<>(cache.keys.size());
List<byte[]> deleteBatch = new ArrayList<>(cache.keys.size());
for (K key : cacheDirtyKeys) {
RocksDBCacheEntry entry = cache.get(key);
assert entry.isDirty;
byte[] rawKey = serdes.rawKey(key);
if (entry.value != null) {
putBatch.add(new KeyValue<>(rawKey, serdes.rawValue(entry.value)));
} else {
deleteBatch.add(rawKey);
}
}
putAllInternal(putBatch);
if (loggingEnabled) {
for (KeyValue<byte[], byte[]> kv : putBatch)
changeLogger.add(kv.key);
}
// check all removed entries and remove them in rocksDB
// TODO: can this be done in batch as well?
for (byte[] removedKey : deleteBatch) {
try {
db.remove(wOptions, removedKey);
} catch (RocksDBException e) {
throw new ProcessorStateException("Error while deleting with key " + serdes.keyFrom(removedKey) + " from store " + this.name, e);
}
if (loggingEnabled) {
changeLogger.delete(removedKey);
}
}
// reset dirty set
cacheDirtyKeys.clear();
}
flushInternal();
if (loggingEnabled)
changeLogger.logChange(getter);
}
public void flushInternal() {
try {
db.flush(fOptions);
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
}
}

View File

@ -22,11 +22,13 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.Serdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.WindowStoreUtil;
import org.apache.kafka.streams.state.WindowStoreUtils;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@ -46,7 +48,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
public final long id;
Segment(String name, long id) {
super(name, WindowStoreUtil.INNER_SERDES);
super(name, WindowStoreUtils.INNER_SERDES);
this.id = id;
}
@ -61,7 +63,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
private int index = 0;
RocksDBWindowStoreIterator(Serdes<?, V> serdes) {
this(serdes, WindowStoreUtil.NO_ITERATORS);
this(serdes, WindowStoreUtils.NO_ITERATORS);
}
RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
@ -87,7 +89,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
KeyValue<byte[], byte[]> kv = iterators[index].next();
return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(kv.key),
return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key),
serdes.valueFrom(kv.value));
}
@ -111,10 +113,14 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
private final Segment[] segments;
private final Serdes<K, V> serdes;
private final SimpleDateFormat formatter;
private final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
private ProcessorContext context;
private long currentSegmentId = -1L;
private int seqnum = 0;
private long currentSegmentId = -1L;
private boolean loggingEnabled = false;
private StoreChangeLogger<byte[], byte[]> changeLogger = null;
public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) {
this.name = name;
@ -127,11 +133,23 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
this.retainDuplicates = retainDuplicates;
this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
public byte[] get(byte[] key) {
return getInternal(key);
}
};
// Create a date formatter. Formatted timestamps are used as segment name suffixes
this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
}
public RocksDBWindowStore<K, V> enableLogging() {
loggingEnabled = true;
return this;
}
@Override
public String name() {
return name;
@ -140,6 +158,17 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
@Override
public void init(ProcessorContext context) {
this.context = context;
this.changeLogger = this.loggingEnabled ?
new RawStoreChangeLogger(name, context) : null;
// register and possibly restore the state from the logs
context.register(this, loggingEnabled, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
putInternal(key, value);
}
});
}
@Override
@ -153,6 +182,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
if (segment != null)
segment.flush();
}
if (loggingEnabled)
changeLogger.logChange(this.getter);
}
@Override
@ -165,16 +197,25 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
@Override
public void put(K key, V value) {
putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
byte[] rawKey = putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
if (loggingEnabled) {
changeLogger.add(rawKey);
changeLogger.maybeLogChange(this.getter);
}
}
@Override
public void put(K key, V value, long timestamp) {
putAndReturnInternalKey(key, value, timestamp);
byte[] rawKey = putAndReturnInternalKey(key, value, timestamp);
if (loggingEnabled) {
changeLogger.add(rawKey);
changeLogger.maybeLogChange(this.getter);
}
}
@Override
public byte[] putAndReturnInternalKey(K key, V value, long t) {
private byte[] putAndReturnInternalKey(K key, V value, long t) {
long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t;
long segmentId = segmentId(timestamp);
@ -189,7 +230,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
if (segmentId > currentSegmentId - segments.length) {
if (retainDuplicates)
seqnum = (seqnum + 1) & 0x7FFFFFFF;
byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes);
getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
return binaryKey;
} else {
@ -197,9 +238,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
}
}
@Override
public void putInternal(byte[] binaryKey, byte[] binaryValue) {
long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
private void putInternal(byte[] binaryKey, byte[] binaryValue) {
long segmentId = segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey));
if (segmentId > currentSegmentId) {
// A new segment will be created. Clean up old segments first.
@ -212,9 +252,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
getSegment(segmentId).put(binaryKey, binaryValue);
}
@Override
public byte[] getInternal(byte[] binaryKey) {
long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
private byte[] getInternal(byte[] binaryKey) {
long segmentId = segmentId(WindowStoreUtils.timestampFromBinaryKey(binaryKey));
Segment segment = segments[(int) (segmentId % segments.length)];
@ -231,8 +270,8 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
long segFrom = segmentId(timeFrom);
long segTo = segmentId(Math.max(0L, timeTo));
byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes);
byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes);
byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
byte[] binaryUntil = WindowStoreUtils.toBinaryKey(key, timeTo + 1L, 0, serdes);
ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
@ -271,15 +310,16 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
}
}
public long segmentId(long timestamp) {
private long segmentId(long timestamp) {
return timestamp / segmentInterval;
}
// this method is defined public since it is used for unit tests
public String directorySuffix(long segmentId) {
return formatter.format(new Date(segmentId * segmentInterval));
}
// this method is used by a test
// this method is defined public since it is used for unit tests
public Set<Long> segmentIds() {
HashSet<Long> segmentIds = new HashSet<>();

View File

@ -36,7 +36,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
private final long retentionPeriod;
private final boolean retainDuplicates;
private final int numSegments;
private final Serdes serdes;
private final Serdes<K, V> serdes;
private final Time time;
public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
@ -53,7 +53,7 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time);
return new MeteredWindowStore<>(new RocksDBWindowStore<>(name, retentionPeriod, numSegments, retainDuplicates, serdes).enableLogging(), "rocksdb-window", time);
}
}

View File

@ -32,28 +32,41 @@ public class StoreChangeLogger<K, V> {
V get(K key);
}
// TODO: these values should be configurable
protected static final int DEFAULT_WRITE_BATCH_SIZE = 100;
protected final Serdes<K, V> serialization;
private final Set<K> dirty;
private final Set<K> removed;
private final String topic;
private final int partition;
private final ProcessorContext context;
private final int maxDirty;
private final int maxRemoved;
private final String topic;
private int partition;
private ProcessorContext context;
protected Set<K> dirty;
protected Set<K> removed;
// always wrap the logged store with the metered store
public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
this.topic = topic;
this.serialization = serialization;
this.context = context;
this.partition = context.id().partition;
this(topic, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
}
public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
this(topic, context, context.id().partition, serialization, maxDirty, maxRemoved);
init();
}
protected StoreChangeLogger(String topic, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
this.topic = topic;
this.context = context;
this.partition = partition;
this.serialization = serialization;
this.maxDirty = maxDirty;
this.maxRemoved = maxRemoved;
}
public void init() {
this.dirty = new HashSet<>();
this.removed = new HashSet<>();
this.maxDirty = 100; // TODO: this needs to be configurable
this.maxRemoved = 100; // TODO: this needs to be configurable
}
public void add(K key) {
@ -89,4 +102,18 @@ public class StoreChangeLogger<K, V> {
}
}
public void clear() {
this.removed.clear();
this.dirty.clear();
}
// this is for test only
public int numDirty() {
return this.dirty.size();
}
// this is for test only
public int numRemoved() {
return this.removed.size();
}
}

View File

@ -37,7 +37,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateUtils;
import org.apache.kafka.streams.state.StateTestUtils;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.ProcessorTopologyTestDriver;
@ -65,7 +65,7 @@ public class ProcessorTopologyTest {
@Before
public void setup() {
// Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
File localState = StateUtils.tempDir();
File localState = StateTestUtils.tempDir();
Properties props = new Properties();
props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");

View File

@ -219,7 +219,6 @@ public class KeyValueStoreTestDriver<K, V> {
return new KeyValueStoreTestDriver<K, V>(serdes);
}
private final Serdes<K, V> serdes;
private final Map<K, V> flushedEntries = new HashMap<>();
private final Set<K> flushedRemovals = new HashSet<>();
private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>();
@ -238,22 +237,40 @@ public class KeyValueStoreTestDriver<K, V> {
private final RecordCollector recordCollector;
private File stateDir = null;
protected KeyValueStoreTestDriver(Serdes<K, V> serdes) {
this.serdes = serdes;
protected KeyValueStoreTestDriver(final Serdes<K, V> serdes) {
ByteArraySerializer rawSerializer = new ByteArraySerializer();
Producer<byte[], byte[]> producer = new MockProducer<byte[], byte[]>(true, rawSerializer, rawSerializer);
Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
this.recordCollector = new RecordCollector(producer) {
@SuppressWarnings("unchecked")
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
recordFlushed(record.key(), record.value());
// for byte arrays we need to wrap it for comparison
K key;
if (record.key() instanceof byte[]) {
key = serdes.keyFrom((byte[]) record.key());
} else {
key = (K) record.key();
}
V value;
if (record.key() instanceof byte[]) {
value = serdes.valueFrom((byte[]) record.value());
} else {
value = (V) record.value();
}
recordFlushed(key, value);
}
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
StreamPartitioner<K1, V1> partitioner) {
recordFlushed(record.key(), record.value());
// ignore partitioner
send(record, keySerializer, valueSerializer);
}
};
this.stateDir = StateUtils.tempDir();
this.stateDir = StateTestUtils.tempDir();
this.stateDir.mkdirs();
Properties props = new Properties();
@ -279,7 +296,7 @@ public class KeyValueStoreTestDriver<K, V> {
@Override
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback func) {
storeMap.put(store.name(), store);
restoreEntries(func);
restoreEntries(func, serdes);
}
@Override
@ -299,21 +316,19 @@ public class KeyValueStoreTestDriver<K, V> {
};
}
@SuppressWarnings("unchecked")
protected <K1, V1> void recordFlushed(K1 key, V1 value) {
K k = (K) key;
protected void recordFlushed(K key, V value) {
if (value == null) {
// This is a removal ...
flushedRemovals.add(k);
flushedEntries.remove(k);
flushedRemovals.add(key);
flushedEntries.remove(key);
} else {
// This is a normal add
flushedEntries.put(k, (V) value);
flushedRemovals.remove(k);
flushedEntries.put(key, value);
flushedRemovals.remove(key);
}
}
private void restoreEntries(StateRestoreCallback func) {
private void restoreEntries(StateRestoreCallback func, Serdes<K, V> serdes) {
for (KeyValue<K, V> entry : restorableEntries) {
if (entry != null) {
byte[] rawKey = serdes.rawKey(entry.key);
@ -439,6 +454,13 @@ public class KeyValueStoreTestDriver<K, V> {
return flushedRemovals.contains(key);
}
/**
* Return number of removed entry
*/
public int numFlushedEntryRemoved() {
return flushedRemovals.size();
}
/**
* Remove all {@link #flushedEntryStored(Object) flushed entries}, {@link #flushedEntryRemoved(Object) flushed removals},
*/

View File

@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* A utility for tests to create and manage unique and isolated directories on the file system for local state.
*/
public class StateUtils {
public class StateTestUtils {
private static final AtomicLong INSTANCE_COUNTER = new AtomicLong();

View File

@ -37,7 +37,7 @@ public abstract class AbstractKeyValueStoreTest {
@Test
public void testPutGetRange() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
try {

View File

@ -16,144 +16,95 @@
*/
package org.apache.kafka.streams.state.internals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
public class InMemoryLRUCacheStoreTest {
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
@SuppressWarnings("unchecked")
@Test
public void testPutGetRange() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
StateStoreSupplier supplier = Stores.create("my-store")
.withIntegerKeys().withStringValues()
.inMemory().maxEntries(3)
.build();
KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
store.init(driver.context());
@Override
protected <K, V> KeyValueStore<K, V> createKeyValueStore(
ProcessorContext context,
Class<K> keyClass,
Class<V> valueClass,
boolean useContextSerdes) {
// Verify that the store reads and writes correctly, keeping only the last 2 entries ...
store.put(0, "zero");
store.put(1, "one");
store.put(2, "two");
store.put(3, "three");
store.put(4, "four");
store.put(5, "five");
StateStoreSupplier supplier;
if (useContextSerdes) {
Serializer<K> keySer = (Serializer<K>) context.keySerializer();
Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().maxEntries(10).build();
} else {
supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().maxEntries(10).build();
}
// It should only keep the last 4 added ...
assertEquals(3, driver.sizeOf(store));
assertNull(store.get(0));
assertNull(store.get(1));
assertNull(store.get(2));
assertEquals("three", store.get(3));
assertEquals("four", store.get(4));
assertEquals("five", store.get(5));
store.delete(5);
// Flush the store and verify all current entries were properly flushed ...
store.flush();
assertNull(driver.flushedEntryStored(0));
assertNull(driver.flushedEntryStored(1));
assertNull(driver.flushedEntryStored(2));
assertEquals("three", driver.flushedEntryStored(3));
assertEquals("four", driver.flushedEntryStored(4));
assertNull(driver.flushedEntryStored(5));
assertEquals(true, driver.flushedEntryRemoved(0));
assertEquals(true, driver.flushedEntryRemoved(1));
assertEquals(true, driver.flushedEntryRemoved(2));
assertEquals(false, driver.flushedEntryRemoved(3));
assertEquals(false, driver.flushedEntryRemoved(4));
assertEquals(true, driver.flushedEntryRemoved(5));
}
@SuppressWarnings("unchecked")
@Test
public void testPutGetRangeWithDefaultSerdes() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer();
Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
StateStoreSupplier supplier = Stores.create("my-store")
.withKeys(keySer, keyDeser)
.withValues(valSer, valDeser)
.inMemory().maxEntries(3)
.build();
KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
store.init(driver.context());
// Verify that the store reads and writes correctly, keeping only the last 2 entries ...
store.put(0, "zero");
store.put(1, "one");
store.put(2, "two");
store.put(3, "three");
store.put(4, "four");
store.put(5, "five");
// It should only keep the last 4 added ...
assertEquals(3, driver.sizeOf(store));
assertNull(store.get(0));
assertNull(store.get(1));
assertNull(store.get(2));
assertEquals("three", store.get(3));
assertEquals("four", store.get(4));
assertEquals("five", store.get(5));
store.delete(5);
// Flush the store and verify all current entries were properly flushed ...
store.flush();
assertNull(driver.flushedEntryStored(0));
assertNull(driver.flushedEntryStored(1));
assertNull(driver.flushedEntryStored(2));
assertEquals("three", driver.flushedEntryStored(3));
assertEquals("four", driver.flushedEntryStored(4));
assertNull(driver.flushedEntryStored(5));
assertEquals(true, driver.flushedEntryRemoved(0));
assertEquals(true, driver.flushedEntryRemoved(1));
assertEquals(true, driver.flushedEntryRemoved(2));
assertEquals(false, driver.flushedEntryRemoved(3));
assertEquals(false, driver.flushedEntryRemoved(4));
assertEquals(true, driver.flushedEntryRemoved(5));
KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
store.init(context);
return store;
}
@Test
public void testRestore() {
public void testEvict() {
// Create the test driver ...
KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
// Add any entries that will be restored to any store
// that uses the driver's context ...
driver.addEntryToRestoreLog(1, "one");
driver.addEntryToRestoreLog(2, "two");
driver.addEntryToRestoreLog(4, "four");
try {
store.put(0, "zero");
store.put(1, "one");
store.put(2, "two");
store.put(3, "three");
store.put(4, "four");
store.put(5, "five");
store.put(6, "six");
store.put(7, "seven");
store.put(8, "eight");
store.put(9, "nine");
assertEquals(10, driver.sizeOf(store));
// Create the store, which should register with the context and automatically
// receive the restore entries ...
StateStoreSupplier supplier = Stores.create("my-store")
.withIntegerKeys().withStringValues()
.inMemory().maxEntries(3)
.build();
KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
store.init(driver.context());
store.put(10, "ten");
store.flush();
assertEquals(10, driver.sizeOf(store));
assertTrue(driver.flushedEntryRemoved(0));
assertEquals(1, driver.numFlushedEntryRemoved());
// Verify that the store's contents were properly restored ...
assertEquals(0, driver.checkForRestoredEntries(store));
store.delete(1);
store.flush();
assertEquals(9, driver.sizeOf(store));
assertTrue(driver.flushedEntryRemoved(0));
assertTrue(driver.flushedEntryRemoved(1));
assertEquals(2, driver.numFlushedEntryRemoved());
// and there are no other entries ...
assertEquals(3, driver.sizeOf(store));
store.put(11, "eleven");
store.flush();
assertEquals(10, driver.sizeOf(store));
assertEquals(2, driver.numFlushedEntryRemoved());
store.put(2, "two-again");
store.flush();
assertEquals(10, driver.sizeOf(store));
assertEquals(2, driver.numFlushedEntryRemoved());
store.put(12, "twelve");
store.flush();
assertEquals(10, driver.sizeOf(store));
assertTrue(driver.flushedEntryRemoved(0));
assertTrue(driver.flushedEntryRemoved(1));
assertTrue(driver.flushedEntryRemoved(2));
assertEquals(3, driver.numFlushedEntryRemoved());
} finally {
store.close();
}
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
public class OffsetCheckpointTest {
private final String topic = "topic";
private OffsetCheckpoint checkpoint = null;
@Test
public void testReadWrite() throws IOException {
File f = new File("/tmp/kafka-streams/offset_checkpoint.test");
checkpoint = new OffsetCheckpoint(f);
try {
Map<TopicPartition, Long> offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), 0L);
offsets.put(new TopicPartition(topic, 1), 1L);
offsets.put(new TopicPartition(topic, 2), 2L);
checkpoint.write(offsets);
assertEquals(offsets, checkpoint.read());
checkpoint.delete();
assertFalse(f.exists());
offsets.put(new TopicPartition(topic, 3), 3L);
checkpoint.write(offsets);
assertEquals(offsets, checkpoint.read());
} finally {
checkpoint.delete();
}
}
}

View File

@ -39,9 +39,9 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).persistent().build();
} else {
supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build();
supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).persistent().build();
}
KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();

View File

@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.Serdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.WindowStoreUtil;
import org.apache.kafka.streams.state.WindowStoreUtils;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
@ -58,8 +58,10 @@ public class RocksDBWindowStoreTest {
private final long windowSize = 3;
private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class);
@SuppressWarnings("unchecked")
protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) {
StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null);
WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
store.init(context);
return store;
@ -659,8 +661,8 @@ public class RocksDBWindowStoreTest {
HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
for (KeyValue<byte[], byte[]> entry : changeLog) {
long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key);
Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key, serdes);
long timestamp = WindowStoreUtils.timestampFromBinaryKey(entry.key);
Integer key = WindowStoreUtils.keyFromBinaryKey(entry.key, serdes);
String value = entry.value == null ? null : serdes.valueFrom(entry.value);
Set<String> entries = entriesByKey.get(key);

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.kafka.streams.state.internals;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.Serdes;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class StoreChangeLoggerTest {
private final String topic = "topic";
private final Map<Integer, String> logged = new HashMap<>();
private final Map<Integer, String> written = new HashMap<>();
private final ProcessorContext context = new MockProcessorContext(Serdes.withBuiltinTypes(topic, Integer.class, String.class),
new RecordCollector(null) {
@SuppressWarnings("unchecked")
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
logged.put((Integer) record.key(), (String) record.value());
}
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer,
StreamPartitioner<K1, V1> partitioner) {
// ignore partitioner
send(record, keySerializer, valueSerializer);
}
}
);
private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, Serdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3);
private final StoreChangeLogger<byte[], byte[]> rawChangeLogger = new RawStoreChangeLogger(topic, context, 3, 3);
private final StoreChangeLogger.ValueGetter<Integer, String> getter = new StoreChangeLogger.ValueGetter<Integer, String>() {
@Override
public String get(Integer key) {
return written.get(key);
}
};
private final StoreChangeLogger.ValueGetter<byte[], byte[]> rawGetter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
private IntegerDeserializer deserializer = new IntegerDeserializer();
private StringSerializer serializer = new StringSerializer();
@Override
public byte[] get(byte[] key) {
return serializer.serialize(topic, written.get(deserializer.deserialize(topic, key)));
}
};
@Test
public void testAddRemove() {
written.put(0, "zero");
changeLogger.add(0);
written.put(1, "one");
changeLogger.add(1);
written.put(2, "two");
changeLogger.add(2);
assertEquals(3, changeLogger.numDirty());
assertEquals(0, changeLogger.numRemoved());
changeLogger.delete(0);
changeLogger.delete(1);
written.put(3, "three");
changeLogger.add(3);
assertEquals(2, changeLogger.numDirty());
assertEquals(2, changeLogger.numRemoved());
written.put(0, "zero-again");
changeLogger.add(0);
assertEquals(3, changeLogger.numDirty());
assertEquals(1, changeLogger.numRemoved());
written.put(4, "four");
changeLogger.add(4);
changeLogger.maybeLogChange(getter);
assertEquals(0, changeLogger.numDirty());
assertEquals(0, changeLogger.numRemoved());
assertEquals(5, logged.size());
assertEquals("zero-again", logged.get(0));
assertEquals(null, logged.get(1));
assertEquals("two", logged.get(2));
assertEquals("three", logged.get(3));
assertEquals("four", logged.get(4));
}
@Test
public void testRaw() {
IntegerSerializer serializer = new IntegerSerializer();
written.put(0, "zero");
rawChangeLogger.add(serializer.serialize(topic, 0));
written.put(1, "one");
rawChangeLogger.add(serializer.serialize(topic, 1));
written.put(2, "two");
rawChangeLogger.add(serializer.serialize(topic, 2));
assertEquals(3, rawChangeLogger.numDirty());
assertEquals(0, rawChangeLogger.numRemoved());
rawChangeLogger.delete(serializer.serialize(topic, 0));
rawChangeLogger.delete(serializer.serialize(topic, 1));
written.put(3, "three");
rawChangeLogger.add(serializer.serialize(topic, 3));
assertEquals(2, rawChangeLogger.numDirty());
assertEquals(2, rawChangeLogger.numRemoved());
written.put(0, "zero-again");
rawChangeLogger.add(serializer.serialize(topic, 0));
assertEquals(3, rawChangeLogger.numDirty());
assertEquals(1, rawChangeLogger.numRemoved());
}
}

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.Serdes;
import java.io.File;
import java.util.Collections;
@ -49,6 +50,16 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
long timestamp = -1L;
public MockProcessorContext(Serdes<?, ?> serdes, RecordCollector collector) {
this(null, null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(), serdes.valueDeserializer(), collector);
}
public MockProcessorContext(Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,
RecordCollector collector) {
this(null, null, keySerializer, keyDeserializer, valueSerializer, valueDeserializer, collector);
}
public MockProcessorContext(KStreamTestDriver driver, File stateDir,
Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
Serializer<?> valueSerializer, Deserializer<?> valueDeserializer,

View File

@ -49,9 +49,9 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
@Override
public StateStore get() {
if (loggingEnabled) {
return new MockStateStore(name, persistent);
return new MockStateStore(name, persistent).enableLogging();
} else {
return new MockStateStore(name, persistent).disableLogging();
return new MockStateStore(name, persistent);
}
}
@ -59,7 +59,7 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
private final String name;
private final boolean persistent;
public boolean loggingEnabled = true;
public boolean loggingEnabled = false;
public boolean initialized = false;
public boolean flushed = false;
public boolean closed = false;
@ -70,8 +70,8 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
this.persistent = persistent;
}
public MockStateStore disableLogging() {
loggingEnabled = false;
public MockStateStore enableLogging() {
loggingEnabled = true;
return this;
}