diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index c85fe0314ac..e40251d31c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -30,6 +30,8 @@ import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplie import org.apache.kafka.streams.state.internals.RocksDbSessionBytesStoreSupplier; import org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder; +import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; import java.time.Duration; @@ -75,21 +77,47 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail * } */ @InterfaceStability.Evolving -public class Stores { +public final class Stores { /** * Create a persistent {@link KeyValueBytesStoreSupplier}. + *

+ * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. + * If you want to create a {@link TimestampedKeyValueStore} you should use + * {@link #persistentTimestampedKeyValueStore(String)} to create a store supplier instead. + * * @param name name of the store (cannot be {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used - * to build a persistent store + * to build a persistent key-value store */ public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) { Objects.requireNonNull(name, "name cannot be null"); return new RocksDbKeyValueBytesStoreSupplier(name, false); } + /** + * Create a persistent {@link KeyValueBytesStoreSupplier}. + *

+ * This store supplier can be passed into a + * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. + * If you want to create a {@link KeyValueStore} you should use + * {@link #persistentKeyValueStore(String)} to create a store supplier instead. + * + * @param name name of the store (cannot be {@code null}) + * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used + * to build a persistent key-(timestamp/value) store + */ + public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name) { + Objects.requireNonNull(name, "name cannot be null"); + return new RocksDbKeyValueBytesStoreSupplier(name, true); + } + /** * Create an in-memory {@link KeyValueBytesStoreSupplier}. + *

+ * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)} + * or {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. + * * @param name name of the store (cannot be {@code null}) * @return an instance of a {@link KeyValueBytesStoreSupplier} than can be used to * build an in-memory store @@ -116,6 +144,10 @@ public class Stores { /** * Create a LRU Map {@link KeyValueBytesStoreSupplier}. + *

+ * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)} + * or {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}. + * * @param name name of the store (cannot be {@code null}) * @param maxCacheSize maximum number of items in the LRU (cannot be negative) * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used to build @@ -145,49 +177,13 @@ public class Stores { } /** - * Create an in-memory {@link WindowBytesStoreSupplier}. + * Create a persistent {@link WindowBytesStoreSupplier}. + * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) - * Note that the retention period must be at least long enough to contain the + * (note that the retention period must be at least long enough to contain the * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. - * @param windowSize size of the windows (cannot be negative) - * @return an instance of {@link WindowBytesStoreSupplier} - * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} - */ - public static WindowBytesStoreSupplier inMemoryWindowStore(final String name, - final Duration retentionPeriod, - final Duration windowSize, - final boolean retainDuplicates) throws IllegalArgumentException { - Objects.requireNonNull(name, "name cannot be null"); - final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); - final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix); - final String wsMsgPrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); - final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, wsMsgPrefix); - - Objects.requireNonNull(name, "name cannot be null"); - if (retentionMs < 0L) { - throw new IllegalArgumentException("retentionPeriod cannot be negative"); - } - if (windowSizeMs < 0L) { - throw new IllegalArgumentException("windowSize cannot be negative"); - } - if (windowSizeMs > retentionMs) { - throw new IllegalArgumentException("The retention period of the window store " - + name + " must be no smaller than its window size. Got size=[" - + windowSize + "], retention=[" + retentionPeriod + "]"); - } - - return new InMemoryWindowBytesStoreSupplier(name, retentionMs, windowSizeMs, retainDuplicates); - } - - /** - * Create a persistent {@link WindowBytesStoreSupplier}. - * @param name name of the store (cannot be {@code null}) - * @param retentionPeriod length of time to retain data in the store (cannot be negative). - * Note that the retention period must be at least long enough to contain the - * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. + * and for the entire grace period) * @param numSegments number of db segments (cannot be zero or negative) * @param windowSize size of the windows that are stored (cannot be negative). Note: the window size * is not stored with the records, so this value is used to compute the keys that @@ -214,17 +210,23 @@ public class Stores { retentionPeriod, windowSize, retainDuplicates, - legacySegmentInterval + legacySegmentInterval, + false ); } /** * Create a persistent {@link WindowBytesStoreSupplier}. + *

+ * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}. + * If you want to create a {@link TimestampedWindowStore} you should use + * {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead. + * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length of time to retain data in the store (cannot be negative) - * Note that the retention period must be at least long enough to contain the + * (note that the retention period must be at least long enough to contain the * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. + * and for the entire grace period) * @param windowSize size of the windows (cannot be negative) * @param retainDuplicates whether or not to retain duplicates. * @return an instance of {@link WindowBytesStoreSupplier} @@ -234,6 +236,39 @@ public class Stores { final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates) throws IllegalArgumentException { + return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, false); + } + + /** + * Create a persistent {@link WindowBytesStoreSupplier}. + *

+ * This store supplier can be passed into a + * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}. + * If you want to create a {@link WindowStore} you should use + * {@link #persistentWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead. + * + * @param name name of the store (cannot be {@code null}) + * @param retentionPeriod length of time to retain data in the store (cannot be negative) + * (note that the retention period must be at least long enough to contain the + * windowed data's entire life cycle, from window-start through window-end, + * and for the entire grace period) + * @param windowSize size of the windows (cannot be negative) + * @param retainDuplicates whether or not to retain duplicates. + * @return an instance of {@link WindowBytesStoreSupplier} + * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} + */ + public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name, + final Duration retentionPeriod, + final Duration windowSize, + final boolean retainDuplicates) throws IllegalArgumentException { + return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, true); + } + + private static WindowBytesStoreSupplier persistentWindowStore(final String name, + final Duration retentionPeriod, + final Duration windowSize, + final boolean retainDuplicates, + final boolean timestampedStore) { Objects.requireNonNull(name, "name cannot be null"); final String rpMsgPrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, rpMsgPrefix); @@ -242,14 +277,15 @@ public class Stores { final long defaultSegmentInterval = Math.max(retentionMs / 2, 60_000L); - return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval); + return persistentWindowStore(name, retentionMs, windowSizeMs, retainDuplicates, defaultSegmentInterval, timestampedStore); } private static WindowBytesStoreSupplier persistentWindowStore(final String name, final long retentionPeriod, final long windowSize, final boolean retainDuplicates, - final long segmentInterval) { + final long segmentInterval, + final boolean timestampedStore) { Objects.requireNonNull(name, "name cannot be null"); if (retentionPeriod < 0L) { throw new IllegalArgumentException("retentionPeriod cannot be negative"); @@ -262,8 +298,8 @@ public class Stores { } if (windowSize > retentionPeriod) { throw new IllegalArgumentException("The retention period of the window store " - + name + " must be no smaller than its window size. Got size=[" - + windowSize + "], retention=[" + retentionPeriod + "]"); + + name + " must be no smaller than its window size. Got size=[" + + windowSize + "], retention=[" + retentionPeriod + "]"); } return new RocksDbWindowBytesStoreSupplier( @@ -272,7 +308,49 @@ public class Stores { segmentInterval, windowSize, retainDuplicates, - false); + timestampedStore); + } + + /** + * Create an in-memory {@link WindowBytesStoreSupplier}. + *

+ * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)} or + * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}. + * + * @param name name of the store (cannot be {@code null}) + * @param retentionPeriod length of time to retain data in the store (cannot be negative) + * Note that the retention period must be at least long enough to contain the + * windowed data's entire life cycle, from window-start through window-end, + * and for the entire grace period. + * @param windowSize size of the windows (cannot be negative) + * @return an instance of {@link WindowBytesStoreSupplier} + * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds} + */ + public static WindowBytesStoreSupplier inMemoryWindowStore(final String name, + final Duration retentionPeriod, + final Duration windowSize, + final boolean retainDuplicates) throws IllegalArgumentException { + Objects.requireNonNull(name, "name cannot be null"); + + final String repartitionPeriodErrorMessagePrefix = prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod"); + final long retentionMs = ApiUtils.validateMillisecondDuration(retentionPeriod, repartitionPeriodErrorMessagePrefix); + if (retentionMs < 0L) { + throw new IllegalArgumentException("retentionPeriod cannot be negative"); + } + + final String windowSizeErrorMessagePrefix = prepareMillisCheckFailMsgPrefix(windowSize, "windowSize"); + final long windowSizeMs = ApiUtils.validateMillisecondDuration(windowSize, windowSizeErrorMessagePrefix); + if (windowSizeMs < 0L) { + throw new IllegalArgumentException("windowSize cannot be negative"); + } + + if (windowSizeMs > retentionMs) { + throw new IllegalArgumentException("The retention period of the window store " + + name + " must be no smaller than its window size. Got size=[" + + windowSize + "], retention=[" + retentionPeriod + "]"); + } + + return new InMemoryWindowBytesStoreSupplier(name, retentionMs, windowSizeMs, retainDuplicates); } /** @@ -297,11 +375,12 @@ public class Stores { /** * Create a persistent {@link SessionBytesStoreSupplier}. + * * @param name name of the store (cannot be {@code null}) * @param retentionPeriodMs length ot time to retain data in the store (cannot be negative) - * Note that the retention period must be at least long enough to contain the + * (note that the retention period must be at least long enough to contain the * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. + * and for the entire grace period) * @return an instance of a {@link SessionBytesStoreSupplier} * @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead */ @@ -317,6 +396,7 @@ public class Stores { /** * Create a persistent {@link SessionBytesStoreSupplier}. + * * @param name name of the store (cannot be {@code null}) * @param retentionPeriod length ot time to retain data in the store (cannot be negative) * Note that the retention period must be at least long enough to contain the @@ -331,29 +411,15 @@ public class Stores { return persistentSessionStore(name, ApiUtils.validateMillisecondDuration(retentionPeriod, msgPrefix)); } - /** - * Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}. - * @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null}) - * @param keySerde the key serde to use - * @param valueSerde the value serde to use; if the serialized bytes is null for put operations, - * it is treated as delete - * @param key type - * @param value type - * @return an instance of {@link StoreBuilder} than can build a {@link WindowStore} - */ - public static StoreBuilder> windowStoreBuilder(final WindowBytesStoreSupplier supplier, - final Serde keySerde, - final Serde valueSerde) { - Objects.requireNonNull(supplier, "supplier cannot be null"); - return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); - } - - /** - * Creates a {@link StoreBuilder} than can be used to build a {@link KeyValueStore}. + * Creates a {@link StoreBuilder} that can be used to build a {@link KeyValueStore}. + *

+ * The provided supplier should not be a supplier for + * {@link TimestampedKeyValueStore TimestampedKeyValueStores}. + * * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null}) * @param keySerde the key serde to use - * @param valueSerde the value serde to use; if the serialized bytes is null for put operations, + * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations, * it is treated as delete * @param key type * @param value type @@ -366,21 +432,86 @@ public class Stores { return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } + /** + * Creates a {@link StoreBuilder} that can be used to build a {@link TimestampedKeyValueStore}. + *

+ * The provided supplier should not be a supplier for + * {@link KeyValueStore KeyValueStores}. For this case, passed in timestamps will be dropped and not stored in the + * key-value-store. On read, no valid timestamp but a dummy timestamp will be returned. + * + * @param supplier a {@link KeyValueBytesStoreSupplier} (cannot be {@code null}) + * @param keySerde the key serde to use + * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations, + * it is treated as delete + * @param key type + * @param value type + * @return an instance of a {@link StoreBuilder} that can build a {@link KeyValueStore} + */ + public static StoreBuilder> timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde) { + Objects.requireNonNull(supplier, "supplier cannot be null"); + return new TimestampedKeyValueStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); + } + + /** + * Creates a {@link StoreBuilder} that can be used to build a {@link WindowStore}. + *

+ * The provided supplier should not be a supplier for + * {@link TimestampedWindowStore TimestampedWindowStores}. + * + * @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null}) + * @param keySerde the key serde to use + * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations, + * it is treated as delete + * @param key type + * @param value type + * @return an instance of {@link StoreBuilder} than can build a {@link WindowStore} + */ + public static StoreBuilder> windowStoreBuilder(final WindowBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde) { + Objects.requireNonNull(supplier, "supplier cannot be null"); + return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); + } + + /** + * Creates a {@link StoreBuilder} that can be used to build a {@link TimestampedWindowStore}. + *

+ * The provided supplier should not be a supplier for + * {@link WindowStore WindowStores}. For this case, passed in timestamps will be dropped and not stored in the + * windows-store. On read, no valid timestamp but a dummy timestamp will be returned. + * + * @param supplier a {@link WindowBytesStoreSupplier} (cannot be {@code null}) + * @param keySerde the key serde to use + * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations, + * it is treated as delete + * @param key type + * @param value type + * @return an instance of {@link StoreBuilder} that can build a {@link TimestampedWindowStore} + */ + public static StoreBuilder> timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier, + final Serde keySerde, + final Serde valueSerde) { + Objects.requireNonNull(supplier, "supplier cannot be null"); + return new TimestampedWindowStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); + } + /** * Creates a {@link StoreBuilder} that can be used to build a {@link SessionStore}. + * * @param supplier a {@link SessionBytesStoreSupplier} (cannot be {@code null}) * @param keySerde the key serde to use - * @param valueSerde the value serde to use; if the serialized bytes is null for put operations, + * @param valueSerde the value serde to use; if the serialized bytes is {@code null} for put operations, * it is treated as delete * @param key type * @param value type * @return an instance of {@link StoreBuilder} than can build a {@link SessionStore} - * */ + */ public static StoreBuilder> sessionStoreBuilder(final SessionBytesStoreSupplier supplier, final Serde keySerde, final Serde valueSerde) { Objects.requireNonNull(supplier, "supplier cannot be null"); return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, Time.SYSTEM); } -} - +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java index 5b5fbc521af..e609b70a01b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/TimestampedBytesStore.java @@ -22,6 +22,9 @@ import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP; public interface TimestampedBytesStore { static byte[] convertToTimestampedFormat(final byte[] plainValue) { + if (plainValue == null) { + return null; + } return ByteBuffer .allocate(8 + plainValue.length) .putLong(NO_TIMESTAMP) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java new file mode 100644 index 00000000000..62cfac3b09c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueByteStoreAdapter.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.util.List; + +import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat; +import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue; + +/** + * This class is used to ensure backward compatibility at DSL level between + * {@link org.apache.kafka.streams.state.TimestampedKeyValueStore} and {@link KeyValueStore}. + *

+ * If a user provides a supplier for plain {@code KeyValueStores} via + * {@link org.apache.kafka.streams.kstream.Materialized#as(KeyValueBytesStoreSupplier)} this adapter is used to + * translate between old a new {@code byte[]} format of the value. + * + * @see KeyValueToTimestampedKeyValueIteratorAdapter + */ +public class KeyValueToTimestampedKeyValueByteStoreAdapter implements KeyValueStore { + final KeyValueStore store; + + KeyValueToTimestampedKeyValueByteStoreAdapter(final KeyValueStore store) { + if (!store.persistent()) { + throw new IllegalArgumentException("Provided store must be a persistent store, but it is not."); + } + this.store = store; + } + + @Override + public void put(final Bytes key, + final byte[] valueWithTimestamp) { + store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp)); + } + + @Override + public byte[] putIfAbsent(final Bytes key, + final byte[] valueWithTimestamp) { + return convertToTimestampedFormat(store.putIfAbsent( + key, + valueWithTimestamp == null ? null : rawValue(valueWithTimestamp))); + } + + @Override + public void putAll(final List> entries) { + for (final KeyValue entry : entries) { + final byte[] valueWithTimestamp = entry.value; + store.put(entry.key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp)); + } + } + + @Override + public byte[] delete(final Bytes key) { + return convertToTimestampedFormat(store.delete(key)); + } + + @Override + public String name() { + return store.name(); + } + + @Override + public void init(final ProcessorContext context, + final StateStore root) { + store.init(context, root); + } + + @Override + public void flush() { + store.flush(); + } + + @Override + public void close() { + store.close(); + } + + @Override + public boolean persistent() { + return true; + } + + @Override + public boolean isOpen() { + return store.isOpen(); + } + + @Override + public byte[] get(final Bytes key) { + return convertToTimestampedFormat(store.get(key)); + } + + @Override + public KeyValueIterator range(final Bytes from, + final Bytes to) { + return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.range(from, to)); + } + + @Override + public KeyValueIterator all() { + return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.all()); + } + + @Override + public long approximateNumEntries() { + return store.approximateNumEntries(); + } + +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueIteratorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueIteratorAdapter.java new file mode 100644 index 00000000000..7bdcb5bbb01 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueToTimestampedKeyValueIteratorAdapter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.KeyValueIterator; + +import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat; + +/** + * This class is used to ensure backward compatibility at DSL level between + * {@link org.apache.kafka.streams.state.TimestampedKeyValueStore} and + * {@link org.apache.kafka.streams.state.KeyValueStore}. + * + * @see KeyValueToTimestampedKeyValueByteStoreAdapter + */ +class KeyValueToTimestampedKeyValueIteratorAdapter implements KeyValueIterator { + private final KeyValueIterator innerIterator; + + KeyValueToTimestampedKeyValueIteratorAdapter(final KeyValueIterator innerIterator) { + this.innerIterator = innerIterator; + } + + @Override + public void close() { + innerIterator.close(); + } + + @Override + public K peekNextKey() { + return innerIterator.peekNextKey(); + } + + @Override + public boolean hasNext() { + return innerIterator.hasNext(); + } + + @Override + public KeyValue next() { + final KeyValue plainKeyValue = innerIterator.next(); + return KeyValue.pair(plainKeyValue.key, convertToTimestampedFormat(plainKeyValue.value)); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java index f52033b9bcb..5466ce864b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedBytesStore; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; @@ -42,12 +43,12 @@ import java.util.NoSuchElementException; import java.util.Objects; import static java.util.Arrays.asList; -import static org.apache.kafka.streams.state.internals.StoreProxyUtils.getValueWithUnknownTimestamp; +import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat; /** * A persistent key-(value-timestamp) store based on RocksDB. */ -public class RocksDBTimestampedStore extends RocksDBStore { +public class RocksDBTimestampedStore extends RocksDBStore implements TimestampedBytesStore { private static final Logger log = LoggerFactory.getLogger(RocksDBTimestampedStore.class); RocksDBTimestampedStore(final String name) { @@ -160,7 +161,7 @@ public class RocksDBTimestampedStore extends RocksDBStore { final byte[] plainValue = db.get(oldColumnFamily, key); if (plainValue != null) { - final byte[] valueWithUnknownTimestamp = getValueWithUnknownTimestamp(plainValue); + final byte[] valueWithUnknownTimestamp = convertToTimestampedFormat(plainValue); // this does only work, because the changelog topic contains correct data already // for other format changes, we cannot take this short cut and can only migrate data // from old to new store on put() @@ -180,7 +181,7 @@ public class RocksDBTimestampedStore extends RocksDBStore { final byte[] plainValue = db.get(oldColumnFamily, key); if (plainValue != null) { - return getValueWithUnknownTimestamp(plainValue); + return convertToTimestampedFormat(plainValue); } return null; @@ -319,12 +320,12 @@ public class RocksDBTimestampedStore extends RocksDBStore { } } else { if (nextWithTimestamp == null) { - next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value())); + next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value())); nextNoTimestamp = null; iterNoTimestamp.next(); } else { if (comparator.compare(nextNoTimestamp, nextWithTimestamp) <= 0) { - next = KeyValue.pair(new Bytes(nextNoTimestamp), getValueWithUnknownTimestamp(iterNoTimestamp.value())); + next = KeyValue.pair(new Bytes(nextNoTimestamp), convertToTimestampedFormat(iterNoTimestamp.value())); nextNoTimestamp = null; iterNoTimestamp.next(); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStore.java similarity index 67% rename from streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java rename to streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStore.java index e78b3826949..b96748eed1a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreProxyUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStore.java @@ -16,21 +16,14 @@ */ package org.apache.kafka.streams.state.internals; -import java.nio.ByteBuffer; +import org.apache.kafka.streams.state.TimestampedBytesStore; -import static org.apache.kafka.clients.consumer.ConsumerRecord.NO_TIMESTAMP; +class RocksDBTimestampedWindowStore extends RocksDBWindowStore implements TimestampedBytesStore { -class StoreProxyUtils { - - static byte[] getValueWithUnknownTimestamp(final byte[] rawValue) { - if (rawValue == null) { - return null; - } - return ByteBuffer - .allocate(8 + rawValue.length) - .putLong(NO_TIMESTAMP) - .put(rawValue) - .array(); + RocksDBTimestampedWindowStore(final SegmentedBytesStore bytesStore, + final boolean retainDuplicates, + final long windowSize) { + super(bytesStore, retainDuplicates, windowSize); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java index b2e8c11a1ee..79f1ee39e04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java @@ -49,28 +49,27 @@ public class RocksDbWindowBytesStoreSupplier implements WindowBytesStoreSupplier @Override public WindowStore get() { - final SegmentedBytesStore segmentedBytesStore; if (!returnTimestampedStore) { - segmentedBytesStore = new RocksDBSegmentedBytesStore( - name, - metricsScope(), - retentionPeriod, - segmentInterval, - new WindowKeySchema() - ); + return new RocksDBWindowStore( + new RocksDBSegmentedBytesStore( + name, + metricsScope(), + retentionPeriod, + segmentInterval, + new WindowKeySchema()), + retainDuplicates, + windowSize); } else { - segmentedBytesStore = new RocksDBTimestampedSegmentedBytesStore( - name, - metricsScope(), - retentionPeriod, - segmentInterval, - new WindowKeySchema() - ); + return new RocksDBTimestampedWindowStore( + new RocksDBTimestampedSegmentedBytesStore( + name, + metricsScope(), + retentionPeriod, + segmentInterval, + new WindowKeySchema()), + retainDuplicates, + windowSize); } - return new RocksDBWindowStore( - segmentedBytesStore, - retainDuplicates, - windowSize); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java index 5a0bf22e994..f43e4e61294 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilder.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; @@ -46,8 +47,12 @@ public class TimestampedKeyValueStoreBuilder @Override public TimestampedKeyValueStore build() { + KeyValueStore store = storeSupplier.get(); + if (!(store instanceof TimestampedBytesStore) && store.persistent()) { + store = new KeyValueToTimestampedKeyValueByteStoreAdapter(store); + } return new MeteredTimestampedKeyValueStore<>( - maybeWrapCaching(maybeWrapLogging(storeSupplier.get())), + maybeWrapCaching(maybeWrapLogging(store)), storeSupplier.metricsScope(), time, keySerde, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java index dcb0d449d3b..2c7c95048ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java @@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.state.TimestampedBytesStore; import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; @@ -42,8 +43,12 @@ public class TimestampedWindowStoreBuilder @Override public TimestampedWindowStore build() { + WindowStore store = storeSupplier.get(); + if (!(store instanceof TimestampedBytesStore) && store.persistent()) { + store = new WindowToTimestampedWindowByteStoreAdapter(store); + } return new MeteredTimestampedWindowStore<>( - maybeWrapCaching(maybeWrapLogging(storeSupplier.get())), + maybeWrapCaching(maybeWrapLogging(store)), storeSupplier.windowSize(), storeSupplier.metricsScope(), time, diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java new file mode 100644 index 00000000000..7bd8665bd35 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; + +import java.time.Instant; + +import static org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat; +import static org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue; + +class WindowToTimestampedWindowByteStoreAdapter implements WindowStore { + final WindowStore store; + + WindowToTimestampedWindowByteStoreAdapter(final WindowStore store) { + if (!store.persistent()) { + throw new IllegalArgumentException("Provided store must be a persistent store, but it is not."); + } + this.store = store; + } + + @Override + public void put(final Bytes key, + final byte[] valueWithTimestamp) { + store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp)); + } + + @Override + public void put(final Bytes key, + final byte[] valueWithTimestamp, + final long windowStartTimestamp) { + store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp), windowStartTimestamp); + } + + @Override + public byte[] fetch(final Bytes key, + final long time) { + return convertToTimestampedFormat(store.fetch(key, time)); + } + + @Override + @SuppressWarnings("deprecation") + public WindowStoreIterator fetch(final Bytes key, + final long timeFrom, + final long timeTo) { + return new WindowToTimestampedWindowIteratorAdapter(store.fetch(key, timeFrom, timeTo)); + } + + @Override + public WindowStoreIterator fetch(final Bytes key, + final Instant from, + final Instant to) { + return new WindowToTimestampedWindowIteratorAdapter(store.fetch(key, from, to)); + } + + @Override + @SuppressWarnings("deprecation") + public KeyValueIterator, byte[]> fetch(final Bytes from, + final Bytes to, + final long timeFrom, + final long timeTo) { + return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetch(from, to, timeFrom, timeTo)); + } + + @Override + public KeyValueIterator, byte[]> fetch(final Bytes from, + final Bytes to, + final Instant fromTime, + final Instant toTime) { + return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetch(from, to, fromTime, toTime)); + } + + @Override + public KeyValueIterator, byte[]> all() { + return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.all()); + } + + @Override + @SuppressWarnings("deprecation") + public KeyValueIterator, byte[]> fetchAll(final long timeFrom, + final long timeTo) { + return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetchAll(timeFrom, timeTo)); + } + + @Override + public KeyValueIterator, byte[]> fetchAll(final Instant from, + final Instant to) { + return new KeyValueToTimestampedKeyValueIteratorAdapter<>(store.fetchAll(from, to)); + } + + @Override + public String name() { + return store.name(); + } + + @Override + public void init(final ProcessorContext context, + final StateStore root) { + store.init(context, root); + } + + @Override + public void flush() { + store.flush(); + } + + @Override + public void close() { + store.close(); + } + + @Override + public boolean persistent() { + return true; + } + + @Override + public boolean isOpen() { + return store.isOpen(); + } + + + private static class WindowToTimestampedWindowIteratorAdapter + extends KeyValueToTimestampedKeyValueIteratorAdapter + implements WindowStoreIterator { + + WindowToTimestampedWindowIteratorAdapter(final KeyValueIterator innerIterator) { + super(innerIterator); + } + } + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java index 4819ac18267..e520df40a53 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/StoresTest.java @@ -36,75 +36,112 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsNot.not; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; public class StoresTest { - @Test(expected = NullPointerException.class) + @Test public void shouldThrowIfPersistentKeyValueStoreStoreNameIsNull() { - Stores.persistentKeyValueStore(null); + final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentKeyValueStore(null)); + assertEquals("name cannot be null", e.getMessage()); } - @Test(expected = NullPointerException.class) + @Test + public void shouldThrowIfPersistentTimestampedKeyValueStoreStoreNameIsNull() { + final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentTimestampedKeyValueStore(null)); + assertEquals("name cannot be null", e.getMessage()); + } + + @Test public void shouldThrowIfIMemoryKeyValueStoreStoreNameIsNull() { - //noinspection ResultOfMethodCallIgnored - Stores.inMemoryKeyValueStore(null); + final Exception e = assertThrows(NullPointerException.class, () -> Stores.inMemoryKeyValueStore(null)); + assertEquals("name cannot be null", e.getMessage()); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowIfILruMapStoreNameIsNull() { - Stores.lruMap(null, 0); + final Exception e = assertThrows(NullPointerException.class, () -> Stores.lruMap(null, 0)); + assertEquals("name cannot be null", e.getMessage()); } - @Test(expected = IllegalArgumentException.class) + @Test public void shouldThrowIfILruMapStoreCapacityIsNegative() { - Stores.lruMap("anyName", -1); + final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.lruMap("anyName", -1)); + assertEquals("maxCacheSize cannot be negative", e.getMessage()); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowIfIPersistentWindowStoreStoreNameIsNull() { - Stores.persistentWindowStore(null, ZERO, ZERO, false); + final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentWindowStore(null, ZERO, ZERO, false)); + assertEquals("name cannot be null", e.getMessage()); } - @Test(expected = IllegalArgumentException.class) + @Test + public void shouldThrowIfIPersistentTimestampedWindowStoreStoreNameIsNull() { + final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentTimestampedWindowStore(null, ZERO, ZERO, false)); + assertEquals("name cannot be null", e.getMessage()); + } + + @Test public void shouldThrowIfIPersistentWindowStoreRetentionPeriodIsNegative() { - Stores.persistentWindowStore("anyName", ofMillis(-1L), ZERO, false); + final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", ofMillis(-1L), ZERO, false)); + assertEquals("retentionPeriod cannot be negative", e.getMessage()); + } + + @Test + public void shouldThrowIfIPersistentTimestampedWindowStoreRetentionPeriodIsNegative() { + final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentTimestampedWindowStore("anyName", ofMillis(-1L), ZERO, false)); + assertEquals("retentionPeriod cannot be negative", e.getMessage()); } @Deprecated - @Test(expected = IllegalArgumentException.class) + @Test public void shouldThrowIfIPersistentWindowStoreIfNumberOfSegmentsSmallerThanOne() { - Stores.persistentWindowStore("anyName", 0L, 1, 0L, false); + final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", 0L, 1, 0L, false)); + assertEquals("numSegments cannot be smaller than 2", e.getMessage()); } - @Test(expected = IllegalArgumentException.class) + @Test public void shouldThrowIfIPersistentWindowStoreIfWindowSizeIsNegative() { - Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false); + final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false)); + assertEquals("windowSize cannot be negative", e.getMessage()); } - @Test(expected = NullPointerException.class) + @Test + public void shouldThrowIfIPersistentTimestampedWindowStoreIfWindowSizeIsNegative() { + final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentTimestampedWindowStore("anyName", ofMillis(0L), ofMillis(-1L), false)); + assertEquals("windowSize cannot be negative", e.getMessage()); + } + + @Test public void shouldThrowIfIPersistentSessionStoreStoreNameIsNull() { - Stores.persistentSessionStore(null, ofMillis(0)); - + final Exception e = assertThrows(NullPointerException.class, () -> Stores.persistentSessionStore(null, ofMillis(0))); + assertEquals("name cannot be null", e.getMessage()); } - @Test(expected = IllegalArgumentException.class) + @Test public void shouldThrowIfIPersistentSessionStoreRetentionPeriodIsNegative() { - Stores.persistentSessionStore("anyName", ofMillis(-1)); + final Exception e = assertThrows(IllegalArgumentException.class, () -> Stores.persistentSessionStore("anyName", ofMillis(-1))); + assertEquals("retentionPeriod cannot be negative", e.getMessage()); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowIfSupplierIsNullForWindowStoreBuilder() { - Stores.windowStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()); + final Exception e = assertThrows(NullPointerException.class, () -> Stores.windowStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray())); + assertEquals("supplier cannot be null", e.getMessage()); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowIfSupplierIsNullForKeyValueStoreBuilder() { - Stores.keyValueStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()); + final Exception e = assertThrows(NullPointerException.class, () -> Stores.keyValueStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray())); + assertEquals("supplier cannot be null", e.getMessage()); } - @Test(expected = NullPointerException.class) + @Test public void shouldThrowIfSupplierIsNullForSessionStoreBuilder() { - Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray()); + final Exception e = assertThrows(NullPointerException.class, () -> Stores.sessionStoreBuilder(null, Serdes.ByteArray(), Serdes.ByteArray())); + assertEquals("supplier cannot be null", e.getMessage()); } @Test @@ -124,6 +161,11 @@ public class StoresTest { allOf(not(instanceOf(RocksDBTimestampedStore.class)), instanceOf(RocksDBStore.class))); } + @Test + public void shouldCreateRocksDbTimestampedStore() { + assertThat(Stores.persistentTimestampedKeyValueStore("store").get(), instanceOf(RocksDBTimestampedStore.class)); + } + @Test public void shouldCreateRocksDbWindowStore() { final WindowStore store = Stores.persistentWindowStore("store", ofMillis(1L), ofMillis(1L), false).get(); @@ -132,11 +174,49 @@ public class StoresTest { assertThat(wrapped, allOf(not(instanceOf(RocksDBTimestampedSegmentedBytesStore.class)), instanceOf(RocksDBSegmentedBytesStore.class))); } + @Test + public void shouldCreateRocksDbTimestampedWindowStore() { + final WindowStore store = Stores.persistentTimestampedWindowStore("store", ofMillis(1L), ofMillis(1L), false).get(); + final StateStore wrapped = ((WrappedStateStore) store).wrapped(); + assertThat(store, instanceOf(RocksDBWindowStore.class)); + assertThat(wrapped, instanceOf(RocksDBTimestampedSegmentedBytesStore.class)); + } + @Test public void shouldCreateRocksDbSessionStore() { assertThat(Stores.persistentSessionStore("store", ofMillis(1)).get(), instanceOf(RocksDBSessionStore.class)); } + @Test + public void shouldBuildKeyValueStore() { + final KeyValueStore store = Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore("name"), + Serdes.String(), + Serdes.String() + ).build(); + assertThat(store, not(nullValue())); + } + + @Test + public void shouldBuildTimestampedKeyValueStore() { + final TimestampedKeyValueStore store = Stores.timestampedKeyValueStoreBuilder( + Stores.persistentTimestampedKeyValueStore("name"), + Serdes.String(), + Serdes.String() + ).build(); + assertThat(store, not(nullValue())); + } + + @Test + public void shouldBuildTimestampedKeyValueStoreThatWrapsKeyValueStore() { + final TimestampedKeyValueStore store = Stores.timestampedKeyValueStoreBuilder( + Stores.persistentKeyValueStore("name"), + Serdes.String(), + Serdes.String() + ).build(); + assertThat(store, not(nullValue())); + } + @Test public void shouldBuildWindowStore() { final WindowStore store = Stores.windowStoreBuilder( @@ -148,9 +228,9 @@ public class StoresTest { } @Test - public void shouldBuildKeyValueStore() { - final KeyValueStore store = Stores.keyValueStoreBuilder( - Stores.persistentKeyValueStore("name"), + public void shouldBuildTimestampedWindowStore() { + final TimestampedWindowStore store = Stores.timestampedWindowStoreBuilder( + Stores.persistentTimestampedWindowStore("store", ofMillis(3L), ofMillis(3L), true), Serdes.String(), Serdes.String() ).build(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java index e6dbc6692a6..7b0eb6d6299 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.TimestampedKeyValueStore; -import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.easymock.MockType; @@ -35,6 +34,9 @@ import org.junit.runner.RunWith; import java.util.Collections; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; @@ -49,9 +51,9 @@ public class TimestampedKeyValueStoreBuilderTest { @Before public void setUp() { - EasyMock.expect(supplier.get()).andReturn(inner); - EasyMock.expect(supplier.name()).andReturn("name"); - EasyMock.replay(supplier); + expect(supplier.get()).andReturn(inner); + expect(supplier.name()).andReturn("name"); + replay(supplier); builder = new TimestampedKeyValueStoreBuilder<>( supplier, Serdes.String(), @@ -114,6 +116,34 @@ public class TimestampedKeyValueStoreBuilderTest { assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner)); } + @Test + public void shouldNotWrapTimestampedByteStore() { + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBTimestampedStore("name")); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + final TimestampedKeyValueStore store = builder + .withLoggingDisabled() + .withCachingDisabled() + .build(); + assertThat(((WrappedStateStore) store).wrapped(), instanceOf(RocksDBTimestampedStore.class)); + } + + @Test + public void shouldWrapPlainKeyValueStoreAsTimestampStore() { + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBStore("name")); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + final TimestampedKeyValueStore store = builder + .withLoggingDisabled() + .withCachingDisabled() + .build(); + assertThat(((WrappedStateStore) store).wrapped(), instanceOf(KeyValueToTimestampedKeyValueByteStoreAdapter.class)); + } + @SuppressWarnings("all") @Test(expected = NullPointerException.class) public void shouldThrowNullPointerIfInnerIsNull() { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java new file mode 100644 index 00000000000..7be31ea884a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilderTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.easymock.EasyMockRunner; +import org.easymock.Mock; +import org.easymock.MockType; +import org.hamcrest.CoreMatchers; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.Collections; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; + +@RunWith(EasyMockRunner.class) +public class TimestampedWindowStoreBuilderTest { + + @Mock(type = MockType.NICE) + private WindowBytesStoreSupplier supplier; + @Mock(type = MockType.NICE) + private WindowStore inner; + private TimestampedWindowStoreBuilder builder; + + @Before + public void setUp() { + expect(supplier.get()).andReturn(inner); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + builder = new TimestampedWindowStoreBuilder<>( + supplier, + Serdes.String(), + Serdes.String(), + new MockTime()); + } + + @Test + public void shouldHaveMeteredStoreAsOuterStore() { + final TimestampedWindowStore store = builder.build(); + assertThat(store, instanceOf(MeteredTimestampedWindowStore.class)); + } + + @Test + public void shouldHaveChangeLoggingStoreByDefault() { + final TimestampedWindowStore store = builder.build(); + final StateStore next = ((WrappedStateStore) store).wrapped(); + assertThat(next, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class)); + } + + @Test + public void shouldNotHaveChangeLoggingStoreWhenDisabled() { + final TimestampedWindowStore store = builder.withLoggingDisabled().build(); + final StateStore next = ((WrappedStateStore) store).wrapped(); + assertThat(next, CoreMatchers.equalTo(inner)); + } + + @Test + public void shouldHaveCachingStoreWhenEnabled() { + final TimestampedWindowStore store = builder.withCachingEnabled().build(); + final StateStore wrapped = ((WrappedStateStore) store).wrapped(); + assertThat(store, instanceOf(MeteredTimestampedWindowStore.class)); + assertThat(wrapped, instanceOf(CachingWindowStore.class)); + } + + @Test + public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() { + final TimestampedWindowStore store = builder + .withLoggingEnabled(Collections.emptyMap()) + .build(); + final StateStore wrapped = ((WrappedStateStore) store).wrapped(); + assertThat(store, instanceOf(MeteredTimestampedWindowStore.class)); + assertThat(wrapped, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class)); + assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.equalTo(inner)); + } + + @Test + public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { + final TimestampedWindowStore store = builder + .withLoggingEnabled(Collections.emptyMap()) + .withCachingEnabled() + .build(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); + final WrappedStateStore changeLogging = (WrappedStateStore) caching.wrapped(); + assertThat(store, instanceOf(MeteredTimestampedWindowStore.class)); + assertThat(caching, instanceOf(CachingWindowStore.class)); + assertThat(changeLogging, instanceOf(ChangeLoggingTimestampedWindowBytesStore.class)); + assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner)); + } + + @Test + public void shouldNotWrapTimestampedByteStore() { + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBTimestampedWindowStore( + new RocksDBTimestampedSegmentedBytesStore( + "name", + "metric-scope", + 10L, + 5L, + new WindowKeySchema()), + false, + 1L)); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + final TimestampedWindowStore store = builder + .withLoggingDisabled() + .withCachingDisabled() + .build(); + assertThat(((WrappedStateStore) store).wrapped(), instanceOf(RocksDBTimestampedWindowStore.class)); + } + + @Test + public void shouldWrapPlainKeyValueStoreAsTimestampStore() { + reset(supplier); + expect(supplier.get()).andReturn(new RocksDBWindowStore( + new RocksDBSegmentedBytesStore( + "name", + "metric-scope", + 10L, + 5L, + new WindowKeySchema()), + false, + 1L)); + expect(supplier.name()).andReturn("name"); + replay(supplier); + + final TimestampedWindowStore store = builder + .withLoggingDisabled() + .withCachingDisabled() + .build(); + assertThat(((WrappedStateStore) store).wrapped(), instanceOf(WindowToTimestampedWindowByteStoreAdapter.class)); + } + + @SuppressWarnings("all") + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfInnerIsNull() { + new TimestampedWindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfKeySerdeIsNull() { + new TimestampedWindowStoreBuilder<>(supplier, null, Serdes.String(), new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfValueSerdeIsNull() { + new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), null, new MockTime()); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerIfTimeIsNull() { + new TimestampedWindowStoreBuilder<>(supplier, Serdes.String(), Serdes.String(), null); + } + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java index 022f6ddd85f..bf29d4ae52d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowStoreBuilderTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.apache.kafka.streams.state.WindowStore; -import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.easymock.MockType; @@ -34,6 +33,8 @@ import org.junit.runner.RunWith; import java.util.Collections; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; @@ -47,16 +48,16 @@ public class WindowStoreBuilderTest { private WindowStoreBuilder builder; @Before - public void setUp() throws Exception { - EasyMock.expect(supplier.get()).andReturn(inner); - EasyMock.expect(supplier.name()).andReturn("name"); - EasyMock.replay(supplier); - - builder = new WindowStoreBuilder<>(supplier, - Serdes.String(), - Serdes.String(), - new MockTime()); + public void setUp() { + expect(supplier.get()).andReturn(inner); + expect(supplier.name()).andReturn("name"); + replay(supplier); + builder = new WindowStoreBuilder<>( + supplier, + Serdes.String(), + Serdes.String(), + new MockTime()); } @Test @@ -76,7 +77,7 @@ public class WindowStoreBuilderTest { public void shouldNotHaveChangeLoggingStoreWhenDisabled() { final WindowStore store = builder.withLoggingDisabled().build(); final StateStore next = ((WrappedStateStore) store).wrapped(); - assertThat(next, CoreMatchers.equalTo(inner)); + assertThat(next, CoreMatchers.equalTo(inner)); } @Test @@ -90,18 +91,18 @@ public class WindowStoreBuilderTest { @Test public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() { final WindowStore store = builder - .withLoggingEnabled(Collections.emptyMap()) + .withLoggingEnabled(Collections.emptyMap()) .build(); final StateStore wrapped = ((WrappedStateStore) store).wrapped(); assertThat(store, instanceOf(MeteredWindowStore.class)); assertThat(wrapped, instanceOf(ChangeLoggingWindowBytesStore.class)); - assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.equalTo(inner)); + assertThat(((WrappedStateStore) wrapped).wrapped(), CoreMatchers.equalTo(inner)); } @Test public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() { final WindowStore store = builder - .withLoggingEnabled(Collections.emptyMap()) + .withLoggingEnabled(Collections.emptyMap()) .withCachingEnabled() .build(); final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrapped(); @@ -109,9 +110,10 @@ public class WindowStoreBuilderTest { assertThat(store, instanceOf(MeteredWindowStore.class)); assertThat(caching, instanceOf(CachingWindowStore.class)); assertThat(changeLogging, instanceOf(ChangeLoggingWindowBytesStore.class)); - assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner)); + assertThat(changeLogging.wrapped(), CoreMatchers.equalTo(inner)); } + @SuppressWarnings("all") @Test(expected = NullPointerException.class) public void shouldThrowNullPointerIfInnerIsNull() { new WindowStoreBuilder<>(null, Serdes.String(), Serdes.String(), new MockTime());