KAFAK-3522: add API to create timestamped stores (#6601)

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This commit is contained in:
Matthias J. Sax 2019-05-01 09:28:10 +02:00 committed by GitHub
parent 626fbc60dd
commit c5665e6945
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 939 additions and 166 deletions

View File

@ -30,6 +30,8 @@ import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplie
import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
import java.time.Duration;
@ -75,21 +77,47 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
* }</pre>
*/
@InterfaceStability.Evolving
public class Stores {
public final class Stores {
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link TimestampedKeyValueStore} you should use
* {@link #persistentTimestampedKeyValueStore(String)} to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a persistent store
* to build a persistent key-value store
*/
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) {
Objects.requireNonNull(name, "name cannot be null");
return new RocksDbKeyValueBytesStoreSupplier(name, false);
}
/**
* Create a persistent {@link KeyValueBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a
* {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link KeyValueStore} you should use
* {@link #persistentKeyValueStore(String)} to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
* to build a persistent key-(timestamp/value) store
*/
public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name) {
Objects.requireNonNull(name, "name cannot be null");
return new RocksDbKeyValueBytesStoreSupplier(name, true);
}
/**
* Create an in-memory {@link KeyValueBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}
* or {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
*
* @param name name of the store (cannot be {@code null})
* @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to
* build an in-memory store
@ -116,6 +144,10 @@ public class Stores {
/**
* Create a LRU Map {@link KeyValueBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}
* or {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
*
* @param name name of the store (cannot be {@code null})
* @param maxCacheSize maximum number of items in the LRU (cannot be negative)
* @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build
@ -145,49 +177,13 @@ public class Stores {
}
/**
* Create an in-memory {@link WindowBytesStoreSupplier}.
* Create a persistent {@link WindowBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* Note that the retention period must be at least long enough to contain the
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @param windowSize size of the windows (cannot be negative)
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix);
Objects.requireNonNull(name, "name cannot be null");
if (retentionMs < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (windowSizeMs > retentionMs) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size. Got size=["
+ windowSize + "], retention=[" + retentionPeriod + "]");
}
return new InMemoryWindowBytesStoreSupplier(name, retentionMs, windowSizeMs, retainDuplicates);
}
/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative).
* Note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* and for the entire grace period)
* @param numSegments number of db segments (cannot be zero or negative)
* @param windowSize size of the windows that are stored (cannot be negative). Note: the window size
* is not stored with the records, so this value is used to compute the keys that
@ -214,17 +210,23 @@ public class Stores {
retentionPeriod,
windowSize,
retainDuplicates,
legacySegmentInterval
legacySegmentInterval,
false
);
}
/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link TimestampedWindowStore} you should use
* {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* Note that the retention period must be at least long enough to contain the
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
@ -234,6 +236,39 @@ public class Stores {
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, false);
}
/**
* Create a persistent {@link WindowBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a
* {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
* If you want to create a {@link WindowStore} you should use
* {@link #persistentWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period)
* @param windowSize size of the windows (cannot be negative)
* @param retainDuplicates whether or not to retain duplicates.
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, true);
}
private static WindowBytesStoreSupplier persistentWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates,
final boolean timestampedStore) {
Objects.requireNonNull(name, "name cannot be null");
final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix);
@ -242,14 +277,15 @@ public class Stores {
final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L);
return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval);
return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval, timestampedStore);
}
private static WindowBytesStoreSupplier persistentWindowStore(final String name,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates,
final long segmentInterval) {
final long segmentInterval,
final boolean timestampedStore) {
Objects.requireNonNull(name, "name cannot be null");
if (retentionPeriod < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
@ -272,7 +308,49 @@ public class Stores {
segmentInterval,
windowSize,
retainDuplicates,
false);
timestampedStore);
}
/**
* Create an in-memory {@link WindowBytesStoreSupplier}.
* <p>
* This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)} or
* {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length of time to retain data in the store (cannot be negative)
* Note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* @param windowSize size of the windows (cannot be negative)
* @return an instance of {@link WindowBytesStoreSupplier}
* @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
*/
public static WindowBytesStoreSupplier inMemoryWindowStore(final String name,
final Duration retentionPeriod,
final Duration windowSize,
final boolean retainDuplicates) throws IllegalArgumentException {
Objects.requireNonNull(name, "name cannot be null");
final String repartitionPeriodErrorMessagePrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, repartitionPeriodErrorMessagePrefix);
if (retentionMs < 0L) {
throw new IllegalArgumentException("retentionPeriod cannot be negative");
}
final String windowSizeErrorMessagePrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize");
final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, windowSizeErrorMessagePrefix);
if (windowSizeMs < 0L) {
throw new IllegalArgumentException("windowSize cannot be negative");
}
if (windowSizeMs > retentionMs) {
throw new IllegalArgumentException("The retention period of the window store "
+ name + " must be no smaller than its window size. Got size=["
+ windowSize + "], retention=[" + retentionPeriod + "]");
}
return new InMemoryWindowBytesStoreSupplier(name, retentionMs, windowSizeMs, retainDuplicates);
}
/**
@ -297,11 +375,12 @@ public class Stores {
/**
* Create a persistent {@link SessionBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriodMs length ot time to retain data in the store (cannot be negative)
* Note that the retention period must be at least long enough to contain the
* (note that the retention period must be at least long enough to contain the
* windowed data's entire life cycle, from window-start through window-end,
* and for the entire grace period.
* and for the entire grace period)
* @return an instance of a {@link SessionBytesStoreSupplier}
* @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead
*/
@ -317,6 +396,7 @@ public class Stores {
/**
* Create a persistent {@link SessionBytesStoreSupplier}.
*
* @param name name of the store (cannot be {@code null})
* @param retentionPeriod length ot time to retain data in the store (cannot be negative)
* Note that the retention period must be at least long enough to contain the
@ -331,29 +411,15 @@ public class Stores {
return persistentSessionStore(name, ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix));
}
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
* @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
* @param valueSerde the value serde to use; if the serialized bytes is null for put operations,
* it is treated as delete
* @param <K> key type
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link WindowStore}
*/
public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(final WindowBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}.
* Creates a {@link StoreBuilder} that can be used to build a {@link KeyValueStore}.
* <p>
* The provided supplier should <strong>not</strong> be a supplier for
* {@link TimestampedKeyValueStore TimestampedKeyValueStores}.
*
* @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
* @param valueSerde the value serde to use; if the serialized bytes is null for put operations,
* @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
* it is treated as delete
* @param <K> key type
* @param <V> value type
@ -366,16 +432,82 @@ public class Stores {
return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link TimestampedKeyValueStore}.
* <p>
* The provided supplier should <strong>not</strong> be a supplier for
* {@link KeyValueStore KeyValueStores}. For this case, passed in timestamps will be dropped and not stored in the
* key-value-store. On read, no valid timestamp but a dummy timestamp will be returned.
*
* @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
* @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
* it is treated as delete
* @param <K> key type
* @param <V> value type
* @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore}
*/
public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new TimestampedKeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}.
* <p>
* The provided supplier should <strong>not</strong> be a supplier for
* {@link TimestampedWindowStore TimestampedWindowStores}.
*
* @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
* @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
* it is treated as delete
* @param <K> key type
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link WindowStore}
*/
public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(final WindowBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link TimestampedWindowStore}.
* <p>
* The provided supplier should <strong>not</strong> be a supplier for
* {@link WindowStore WindowStores}. For this case, passed in timestamps will be dropped and not stored in the
* windows-store. On read, no valid timestamp but a dummy timestamp will be returned.
*
* @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
* @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
* it is treated as delete
* @param <K> key type
* @param <V> value type
* @return an instance of {@link StoreBuilder} that can build a {@link TimestampedWindowStore}
*/
public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new TimestampedWindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
/**
* Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}.
*
* @param supplier a {@link SessionBytesStoreSupplier} (cannot be {@code null})
* @param keySerde the key serde to use
* @param valueSerde the value serde to use; if the serialized bytes is null for put operations,
* @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations,
* it is treated as delete
* @param <K> key type
* @param <V> value type
* @return an instance of {@link StoreBuilder} than can build a {@link SessionStore}
* */
*/
public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final SessionBytesStoreSupplier supplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
@ -383,4 +515,3 @@ public class Stores {
return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM);
}
}

View File

@ -22,6 +22,9 @@ import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP;
public interface TimestampedBytesStore {
static byte[] convertToTimestampedFormat(final byte[] plainValue) {
if (plainValue == null) {
return null;
}
return ByteBuffer
.allocate(8 + plainValue.length)
.putLong(NO_TIMESTAMP)

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.List;
import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
/**
* This class is used to ensure backward compatibility at DSL level between
* {@link org.apache.kafka.streams.state.TimestampedKeyValueStore} and {@link KeyValueStore}.
* <p>
* If a user provides a supplier for plain {@code KeyValueStores} via
* {@link org.apache.kafka.streams.kstream.Materialized#as(KeyValueBytesStoreSupplier)} this adapter is used to
* translate between old a new {@code byte[]} format of the value.
*
* @see KeyValueToTimestampedKeyValueIteratorAdapter
*/
public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueStore<Bytes, byte[]> {
final KeyValueStore<Bytes, byte[]> store;
KeyValueToTimestampedKeyValueByteStoreAdapter(final KeyValueStore<Bytes, byte[]> store) {
if (!store.persistent()) {
throw new IllegalArgumentException("Provided store must be a persistent store, but it is not.");
}
this.store = store;
}
@Override
public void put(final Bytes key,
final byte[] valueWithTimestamp) {
store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
}
@Override
public byte[] putIfAbsent(final Bytes key,
final byte[] valueWithTimestamp) {
return convertToTimestampedFormat(store.putIfAbsent(
key,
valueWithTimestamp == null ? null : rawValue(valueWithTimestamp)));
}
@Override
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
for (final KeyValue<Bytes, byte[]> entry : entries) {
final byte[] valueWithTimestamp = entry.value;
store.put(entry.key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
}
}
@Override
public byte[] delete(final Bytes key) {
return convertToTimestampedFormat(store.delete(key));
}
@Override
public String name() {
return store.name();
}
@Override
public void init(final ProcessorContext context,
final StateStore root) {
store.init(context, root);
}
@Override
public void flush() {
store.flush();
}
@Override
public void close() {
store.close();
}
@Override
public boolean persistent() {
return true;
}
@Override
public boolean isOpen() {
return store.isOpen();
}
@Override
public byte[] get(final Bytes key) {
return convertToTimestampedFormat(store.get(key));
}
@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.range(from, to));
}
@Override
public KeyValueIterator<Bytes, byte[]> all() {
return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.all());
}
@Override
public long approximateNumEntries() {
return store.approximateNumEntries();
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
/**
* This class is used to ensure backward compatibility at DSL level between
* {@link org.apache.kafka.streams.state.TimestampedKeyValueStore} and
* {@link org.apache.kafka.streams.state.KeyValueStore}.
*
* @see KeyValueToTimestampedKeyValueByteStoreAdapter
*/
class KeyValueToTimestampedKeyValueIteratorAdapter<K> implements KeyValueIterator<K, byte[]> {
private final KeyValueIterator<K, byte[]> innerIterator;
KeyValueToTimestampedKeyValueIteratorAdapter(final KeyValueIterator<K, byte[]> innerIterator) {
this.innerIterator = innerIterator;
}
@Override
public void close() {
innerIterator.close();
}
@Override
public K peekNextKey() {
return innerIterator.peekNextKey();
}
@Override
public boolean hasNext() {
return innerIterator.hasNext();
}
@Override
public KeyValue<K, byte[]> next() {
final KeyValue<K, byte[]> plainKeyValue = innerIterator.next();
return KeyValue.pair(plainKeyValue.key, convertToTimestampedFormat(plainKeyValue.value));
}
}

View File

@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
@ -42,12 +43,12 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import static java.util.Arrays.asList;
import static org.apache.kafka.streams.state.internals.StoreProxyUtils.getValueWithUnknownTimestamp;
import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
/**
* A persistent key-(value-timestamp) store based on RocksDB.
*/
public class RocksDBTimestampedStore extends RocksDBStore {
public class RocksDBTimestampedStore extends RocksDBStore implements TimestampedBytesStore {
private static final Logger log = LoggerFactory.getLogger(RocksDBTimestampedStore.class);
RocksDBTimestampedStore(final String name) {
@ -160,7 +161,7 @@ public class RocksDBTimestampedStore extends RocksDBStore {
final byte[] plainValue = db.get(oldColumnFamily, key);
if (plainValue != null) {
final byte[] valueWithUnknownTimestamp = getValueWithUnknownTimestamp(plainValue);
final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue);
// this does only work, because the changelog topic contains correct data already
// for other format changes, we cannot take this short cut and can only migrate data
// from old to new store on put()
@ -180,7 +181,7 @@ public class RocksDBTimestampedStore extends RocksDBStore {
final byte[] plainValue = db.get(oldColumnFamily, key);
if (plainValue != null) {
return getValueWithUnknownTimestamp(plainValue);
return convertToTimestampedFormat(plainValue);
}
return null;
@ -319,12 +320,12 @@ public class RocksDBTimestampedStore extends RocksDBStore {
}
} else {
if (nextWithTimestamp == null) {
next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value()));
next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value()));
nextNoTimestamp = null;
iterNoTimestamp.next();
} else {
if (comparator.compare(nextNoTimestamp, nextWithTimestamp) <= 0) {
next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value()));
next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value()));
nextNoTimestamp = null;
iterNoTimestamp.next();
} else {

View File

@ -16,21 +16,14 @@
*/
package org.apache.kafka.streams.state.internals;
import java.nio.ByteBuffer;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP;
class RocksDBTimestampedWindowStore extends RocksDBWindowStore implements TimestampedBytesStore {
class StoreProxyUtils {
static byte[] getValueWithUnknownTimestamp(final byte[] rawValue) {
if (rawValue == null) {
return null;
}
return ByteBuffer
.allocate(8 + rawValue.length)
.putLong(NO_TIMESTAMP)
.put(rawValue)
.array();
RocksDBTimestampedWindowStore(final SegmentedBytesStore bytesStore,
final boolean retainDuplicates,
final long windowSize) {
super(bytesStore, retainDuplicates, windowSize);
}
}

View File

@ -49,28 +49,27 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier
@Override
public WindowStore<Bytes, byte[]> get() {
final SegmentedBytesStore segmentedBytesStore;
if (!returnTimestampedStore) {
segmentedBytesStore = new RocksDBSegmentedBytesStore(
name,
metricsScope(),
retentionPeriod,
segmentInterval,
new WindowKeySchema()
);
} else {
segmentedBytesStore = new RocksDBTimestampedSegmentedBytesStore(
name,
metricsScope(),
retentionPeriod,
segmentInterval,
new WindowKeySchema()
);
}
return new RocksDBWindowStore(
segmentedBytesStore,
new RocksDBSegmentedBytesStore(
name,
metricsScope(),
retentionPeriod,
segmentInterval,
new WindowKeySchema()),
retainDuplicates,
windowSize);
} else {
return new RocksDBTimestampedWindowStore(
new RocksDBTimestampedSegmentedBytesStore(
name,
metricsScope(),
retentionPeriod,
segmentInterval,
new WindowKeySchema()),
retainDuplicates,
windowSize);
}
}
@Override

View File

@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@ -46,8 +47,12 @@ public class TimestampedKeyValueStoreBuilder<K, V>
@Override
public TimestampedKeyValueStore<K, V> build() {
KeyValueStore<Bytes, byte[]> store = storeSupplier.get();
if (!(store instanceof TimestampedBytesStore) && store.persistent()) {
store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store);
}
return new MeteredTimestampedKeyValueStore<>(
maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
maybeWrapCaching(maybeWrapLogging(store)),
storeSupplier.metricsScope(),
time,
keySerde,

View File

@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
@ -42,8 +43,12 @@ public class TimestampedWindowStoreBuilder<K, V>
@Override
public TimestampedWindowStore<K, V> build() {
WindowStore<Bytes, byte[]> store = storeSupplier.get();
if (!(store instanceof TimestampedBytesStore) && store.persistent()) {
store = new WindowToTimestampedWindowByteStoreAdapter(store);
}
return new MeteredTimestampedWindowStore<>(
maybeWrapCaching(maybeWrapLogging(storeSupplier.get())),
maybeWrapCaching(maybeWrapLogging(store)),
storeSupplier.windowSize(),
storeSupplier.metricsScope(),
time,

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.time.Instant;
import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, byte[]> {
final WindowStore<Bytes, byte[]> store;
WindowToTimestampedWindowByteStoreAdapter(final WindowStore<Bytes, byte[]> store) {
if (!store.persistent()) {
throw new IllegalArgumentException("Provided store must be a persistent store, but it is not.");
}
this.store = store;
}
@Override
public void put(final Bytes key,
final byte[] valueWithTimestamp) {
store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
}
@Override
public void put(final Bytes key,
final byte[] valueWithTimestamp,
final long windowStartTimestamp) {
store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp), windowStartTimestamp);
}
@Override
public byte[] fetch(final Bytes key,
final long time) {
return convertToTimestampedFormat(store.fetch(key, time));
}
@Override
@SuppressWarnings("deprecation")
public WindowStoreIterator<byte[]> fetch(final Bytes key,
final long timeFrom,
final long timeTo) {
return new WindowToTimestampedWindowIteratorAdapter(store.fetch(key, timeFrom, timeTo));
}
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key,
final Instant from,
final Instant to) {
return new WindowToTimestampedWindowIteratorAdapter(store.fetch(key, from, to));
}
@Override
@SuppressWarnings("deprecation")
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to,
final long timeFrom,
final long timeTo) {
return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetch(from, to, timeFrom, timeTo));
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
final Bytes to,
final Instant fromTime,
final Instant toTime) {
return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetch(from, to, fromTime, toTime));
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.all());
}
@Override
@SuppressWarnings("deprecation")
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom,
final long timeTo) {
return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetchAll(timeFrom, timeTo));
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant from,
final Instant to) {
return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetchAll(from, to));
}
@Override
public String name() {
return store.name();
}
@Override
public void init(final ProcessorContext context,
final StateStore root) {
store.init(context, root);
}
@Override
public void flush() {
store.flush();
}
@Override
public void close() {
store.close();
}
@Override
public boolean persistent() {
return true;
}
@Override
public boolean isOpen() {
return store.isOpen();
}
private static class WindowToTimestampedWindowIteratorAdapter
extends KeyValueToTimestampedKeyValueIteratorAdapter<Long>
implements WindowStoreIterator<byte[]> {
WindowToTimestampedWindowIteratorAdapter(final KeyValueIterator<Long, byte[]> innerIterator) {
super(innerIterator);
}
}
}

View File

@ -36,75 +36,112 @@ import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
public class StoresTest {
@Test(expected = NullPointerException.class)
@Test
public void shouldThrowIfPersistentKeyValueStoreStoreNameIsNull() {
Stores.persistentKeyValueStore(null);
final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentKeyValueStore(null));
assertEquals("name cannot be null", e.getMessage());
}
@Test(expected = NullPointerException.class)
@Test
public void shouldThrowIfPersistentTimestampedKeyValueStoreStoreNameIsNull() {
final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentTimestampedKeyValueStore(null));
assertEquals("name cannot be null", e.getMessage());
}
@Test
public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() {
//noinspection ResultOfMethodCallIgnored
Stores.inMemoryKeyValueStore(null);
final Exception e = assertThrows(NullPointerException.class, () -> Stores.inMemoryKeyValueStore(null));
assertEquals("name cannot be null", e.getMessage());
}
@Test(expected = NullPointerException.class)
@Test
public void shouldThrowIfILruMapStoreNameIsNull() {
Stores.lruMap(null, 0);
final Exception e = assertThrows(NullPointerException.class, () -> Stores.lruMap(null, 0));
assertEquals("name cannot be null", e.getMessage());
}
@Test(expected = IllegalArgumentException.class)
@Test
public void shouldThrowIfILruMapStoreCapacityIsNegative() {
Stores.lruMap("anyName", -1);
final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.lruMap("anyName", -1));
assertEquals("maxCacheSize cannot be negative", e.getMessage());
}
@Test(expected = NullPointerException.class)
@Test
public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() {
Stores.persistentWindowStore(null, ZERO, ZERO, false);
final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentWindowStore(null, ZERO, ZERO, false));
assertEquals("name cannot be null", e.getMessage());
}
@Test(expected = IllegalArgumentException.class)
@Test
public void shouldThrowIfIPersistentTimestampedWindowStoreStoreNameIsNull() {
final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentTimestampedWindowStore(null, ZERO, ZERO, false));
assertEquals("name cannot be null", e.getMessage());
}
@Test
public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() {
Stores.persistentWindowStore("anyName", ofMillis(-1L), ZERO, false);
final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", ofMillis(-1L), ZERO, false));
assertEquals("retentionPeriod cannot be negative", e.getMessage());
}
@Test
public void shouldThrowIfIPersistentTimestampedWindowStoreRetentionPeriodIsNegative() {
final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentTimestampedWindowStore("anyName", ofMillis(-1L), ZERO, false));
assertEquals("retentionPeriod cannot be negative", e.getMessage());
}
@Deprecated
@Test(expected = IllegalArgumentException.class)
@Test
public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() {
Stores.persistentWindowStore("anyName", 0L, 1, 0L, false);
final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", 0L, 1, 0L, false));
assertEquals("numSegments cannot be smaller than 2", e.getMessage());
}
@Test(expected = IllegalArgumentException.class)
@Test
public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() {
Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false);
final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false));
assertEquals("windowSize cannot be negative", e.getMessage());
}
@Test(expected = NullPointerException.class)
@Test
public void shouldThrowIfIPersistentTimestampedWindowStoreIfWindowSizeIsNegative() {
final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentTimestampedWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false));
assertEquals("windowSize cannot be negative", e.getMessage());
}
@Test
public void shouldThrowIfIPersistentSessionStoreStoreNameIsNull() {
Stores.persistentSessionStore(null, ofMillis(0));
final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentSessionStore(null, ofMillis(0)));
assertEquals("name cannot be null", e.getMessage());
}
@Test(expected = IllegalArgumentException.class)
@Test
public void shouldThrowIfIPersistentSessionStoreRetentionPeriodIsNegative() {
Stores.persistentSessionStore("anyName", ofMillis(-1));
final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentSessionStore("anyName", ofMillis(-1)));
assertEquals("retentionPeriod cannot be negative", e.getMessage());
}
@Test(expected = NullPointerException.class)
@Test
public void shouldThrowIfSupplierIsNullForWindowStoreBuilder() {
Stores.windowStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
final Exception e = assertThrows(NullPointerException.class, () -> Stores.windowStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
assertEquals("supplier cannot be null", e.getMessage());
}
@Test(expected = NullPointerException.class)
@Test
public void shouldThrowIfSupplierIsNullForKeyValueStoreBuilder() {
Stores.keyValueStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
final Exception e = assertThrows(NullPointerException.class, () -> Stores.keyValueStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
assertEquals("supplier cannot be null", e.getMessage());
}
@Test(expected = NullPointerException.class)
@Test
public void shouldThrowIfSupplierIsNullForSessionStoreBuilder() {
Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray());
final Exception e = assertThrows(NullPointerException.class, () -> Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()));
assertEquals("supplier cannot be null", e.getMessage());
}
@Test
@ -124,6 +161,11 @@ public class StoresTest {
allOf(not(instanceOf(RocksDBTimestampedStore.class)), instanceOf(RocksDBStore.class)));
}
@Test
public void shouldCreateRocksDbTimestampedStore() {
assertThat(Stores.persistentTimestampedKeyValueStore("store").get(), instanceOf(RocksDBTimestampedStore.class));
}
@Test
public void shouldCreateRocksDbWindowStore() {
final WindowStore store = Stores.persistentWindowStore("store", ofMillis(1L), ofMillis(1L), false).get();
@ -132,11 +174,49 @@ public class StoresTest {
assertThat(wrapped, allOf(not(instanceOf(RocksDBTimestampedSegmentedBytesStore.class)), instanceOf(RocksDBSegmentedBytesStore.class)));
}
@Test
public void shouldCreateRocksDbTimestampedWindowStore() {
final WindowStore store = Stores.persistentTimestampedWindowStore("store", ofMillis(1L), ofMillis(1L), false).get();
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertThat(store, instanceOf(RocksDBWindowStore.class));
assertThat(wrapped, instanceOf(RocksDBTimestampedSegmentedBytesStore.class));
}
@Test
public void shouldCreateRocksDbSessionStore() {
assertThat(Stores.persistentSessionStore("store", ofMillis(1)).get(), instanceOf(RocksDBSessionStore.class));
}
@Test
public void shouldBuildKeyValueStore() {
final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("name"),
Serdes.String(),
Serdes.String()
).build();
assertThat(store, not(nullValue()));
}
@Test
public void shouldBuildTimestampedKeyValueStore() {
final TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder(
Stores.persistentTimestampedKeyValueStore("name"),
Serdes.String(),
Serdes.String()
).build();
assertThat(store, not(nullValue()));
}
@Test
public void shouldBuildTimestampedKeyValueStoreThatWrapsKeyValueStore() {
final TimestampedKeyValueStore<String, String> store = Stores.timestampedKeyValueStoreBuilder(
Stores.persistentKeyValueStore("name"),
Serdes.String(),
Serdes.String()
).build();
assertThat(store, not(nullValue()));
}
@Test
public void shouldBuildWindowStore() {
final WindowStore<String, String> store = Stores.windowStoreBuilder(
@ -148,9 +228,9 @@ public class StoresTest {
}
@Test
public void shouldBuildKeyValueStore() {
final KeyValueStore<String, String> store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("name"),
public void shouldBuildTimestampedWindowStore() {
final TimestampedWindowStore<String, String> store = Stores.timestampedWindowStoreBuilder(
Stores.persistentTimestampedWindowStore("store", ofMillis(3L), ofMillis(3L), true),
Serdes.String(),
Serdes.String()
).build();

View File

@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
@ -35,6 +34,9 @@ import org.junit.runner.RunWith;
import java.util.Collections;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@ -49,9 +51,9 @@ public class TimestampedKeyValueStoreBuilderTest {
@Before
public void setUp() {
EasyMock.expect(supplier.get()).andReturn(inner);
EasyMock.expect(supplier.name()).andReturn("name");
EasyMock.replay(supplier);
expect(supplier.get()).andReturn(inner);
expect(supplier.name()).andReturn("name");
replay(supplier);
builder = new TimestampedKeyValueStoreBuilder<>(
supplier,
Serdes.String(),
@ -114,6 +116,34 @@ public class TimestampedKeyValueStoreBuilderTest {
assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
}
@Test
public void shouldNotWrapTimestampedByteStore() {
reset(supplier);
expect(supplier.get()).andReturn(new RocksDBTimestampedStore("name"));
expect(supplier.name()).andReturn("name");
replay(supplier);
final TimestampedKeyValueStore<String, String> store = builder
.withLoggingDisabled()
.withCachingDisabled()
.build();
assertThat(((WrappedStateStore) store).wrapped(), instanceOf(RocksDBTimestampedStore.class));
}
@Test
public void shouldWrapPlainKeyValueStoreAsTimestampStore() {
reset(supplier);
expect(supplier.get()).andReturn(new RocksDBStore("name"));
expect(supplier.name()).andReturn("name");
replay(supplier);
final TimestampedKeyValueStore<String, String> store = builder
.withLoggingDisabled()
.withCachingDisabled()
.build();
assertThat(((WrappedStateStore) store).wrapped(), instanceOf(KeyValueToTimestampedKeyValueByteStoreAdapter.class));
}
@SuppressWarnings("all")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {

View File

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@RunWith(EasyMockRunner.class)
public class TimestampedWindowStoreBuilderTest {
@Mock(type = MockType.NICE)
private WindowBytesStoreSupplier supplier;
@Mock(type = MockType.NICE)
private WindowStore<Bytes, byte[]> inner;
private TimestampedWindowStoreBuilder<String, String> builder;
@Before
public void setUp() {
expect(supplier.get()).andReturn(inner);
expect(supplier.name()).andReturn("name");
replay(supplier);
builder = new TimestampedWindowStoreBuilder<>(
supplier,
Serdes.String(),
Serdes.String(),
new MockTime());
}
@Test
public void shouldHaveMeteredStoreAsOuterStore() {
final TimestampedWindowStore<String, String> store = builder.build();
assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
}
@Test
public void shouldHaveChangeLoggingStoreByDefault() {
final TimestampedWindowStore<String, String> store = builder.build();
final StateStore next = ((WrappedStateStore) store).wrapped();
assertThat(next, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class));
}
@Test
public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
final TimestampedWindowStore<String, String> store = builder.withLoggingDisabled().build();
final StateStore next = ((WrappedStateStore) store).wrapped();
assertThat(next, CoreMatchers.equalTo(inner));
}
@Test
public void shouldHaveCachingStoreWhenEnabled() {
final TimestampedWindowStore<String, String> store = builder.withCachingEnabled().build();
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
assertThat(wrapped, instanceOf(CachingWindowStore.class));
}
@Test
public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
final TimestampedWindowStore<String, String> store = builder
.withLoggingEnabled(Collections.emptyMap())
.build();
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
assertThat(wrapped, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class));
assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.equalTo(inner));
}
@Test
public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
final TimestampedWindowStore<String, String> store = builder
.withLoggingEnabled(Collections.emptyMap())
.withCachingEnabled()
.build();
final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrapped();
assertThat(store, instanceOf(MeteredTimestampedWindowStore.class));
assertThat(caching, instanceOf(CachingWindowStore.class));
assertThat(changeLogging, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class));
assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
}
@Test
public void shouldNotWrapTimestampedByteStore() {
reset(supplier);
expect(supplier.get()).andReturn(new RocksDBTimestampedWindowStore(
new RocksDBTimestampedSegmentedBytesStore(
"name",
"metric-scope",
10L,
5L,
new WindowKeySchema()),
false,
1L));
expect(supplier.name()).andReturn("name");
replay(supplier);
final TimestampedWindowStore<String, String> store = builder
.withLoggingDisabled()
.withCachingDisabled()
.build();
assertThat(((WrappedStateStore) store).wrapped(), instanceOf(RocksDBTimestampedWindowStore.class));
}
@Test
public void shouldWrapPlainKeyValueStoreAsTimestampStore() {
reset(supplier);
expect(supplier.get()).andReturn(new RocksDBWindowStore(
new RocksDBSegmentedBytesStore(
"name",
"metric-scope",
10L,
5L,
new WindowKeySchema()),
false,
1L));
expect(supplier.name()).andReturn("name");
replay(supplier);
final TimestampedWindowStore<String, String> store = builder
.withLoggingDisabled()
.withCachingDisabled()
.build();
assertThat(((WrappedStateStore) store).wrapped(), instanceOf(WindowToTimestampedWindowByteStoreAdapter.class));
}
@SuppressWarnings("all")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {
new TimestampedWindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfKeySerdeIsNull() {
new TimestampedWindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfValueSerdeIsNull() {
new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), null, new MockTime());
}
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfTimeIsNull() {
new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null);
}
}

View File

@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
@ -34,6 +33,8 @@ import org.junit.runner.RunWith;
import java.util.Collections;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
@ -47,16 +48,16 @@ public class WindowStoreBuilderTest {
private WindowStoreBuilder<String, String> builder;
@Before
public void setUp() throws Exception {
EasyMock.expect(supplier.get()).andReturn(inner);
EasyMock.expect(supplier.name()).andReturn("name");
EasyMock.replay(supplier);
public void setUp() {
expect(supplier.get()).andReturn(inner);
expect(supplier.name()).andReturn("name");
replay(supplier);
builder = new WindowStoreBuilder<>(supplier,
builder = new WindowStoreBuilder<>(
supplier,
Serdes.String(),
Serdes.String(),
new MockTime());
}
@Test
@ -76,7 +77,7 @@ public class WindowStoreBuilderTest {
public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
final WindowStore<String, String> store = builder.withLoggingDisabled().build();
final StateStore next = ((WrappedStateStore) store).wrapped();
assertThat(next, CoreMatchers.<StateStore>equalTo(inner));
assertThat(next, CoreMatchers.equalTo(inner));
}
@Test
@ -90,18 +91,18 @@ public class WindowStoreBuilderTest {
@Test
public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
final WindowStore<String, String> store = builder
.withLoggingEnabled(Collections.<String, String>emptyMap())
.withLoggingEnabled(Collections.emptyMap())
.build();
final StateStore wrapped = ((WrappedStateStore) store).wrapped();
assertThat(store, instanceOf(MeteredWindowStore.class));
assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class));
assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.<StateStore>equalTo(inner));
assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.equalTo(inner));
}
@Test
public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
final WindowStore<String, String> store = builder
.withLoggingEnabled(Collections.<String, String>emptyMap())
.withLoggingEnabled(Collections.emptyMap())
.withCachingEnabled()
.build();
final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped();
@ -109,9 +110,10 @@ public class WindowStoreBuilderTest {
assertThat(store, instanceOf(MeteredWindowStore.class));
assertThat(caching, instanceOf(CachingWindowStore.class));
assertThat(changeLogging, instanceOf(ChangeLoggingWindowBytesStore.class));
assertThat(changeLogging.wrapped(), CoreMatchers.<StateStore>equalTo(inner));
assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner));
}
@SuppressWarnings("all")
@Test(expected = NullPointerException.class)
public void shouldThrowNullPointerIfInnerIsNull() {
new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());